You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/05/07 16:51:39 UTC
samza git commit: SAMZA-1691: Support get iterable from KeyValueStore
Repository: samza
Updated Branches:
refs/heads/master 2e461a880 -> fa49e7228
SAMZA-1691: Support get iterable from KeyValueStore
Right now for KeyValueStore we have a range query to return an iterator. For usage in BEAM, we need a iterable which will 1) create the snapshot when called, and 2) create an iterator when needed. Add the iterate() function in KeyValueStore to support it. It's implemented as follows:
1) for rocksDb, it will create the iterator when it's called, which will has a snapshot of the elements. Then every time when the iterator is needed, we will seek the iterator from beginning;
2) for inMemoryDb, it will create the snapshot submap when iterate() is called. The submap is an iterable and it can return a new iterator when needed.
Author: xinyuiscool <xi...@linkedin.com>
Reviewers: Boris S <sb...@apache.org>
Closes #492 from xinyuiscool/SAMZA-1691
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fa49e722
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fa49e722
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fa49e722
Branch: refs/heads/master
Commit: fa49e7228319e845bb81336b97af8e28e04524c9
Parents: 2e461a8
Author: xinyuiscool <xi...@linkedin.com>
Authored: Mon May 7 09:51:28 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Mon May 7 09:51:28 2018 -0700
----------------------------------------------------------------------
.../samza/storage/kv/KeyValueIterable.java | 24 ++++++
.../apache/samza/storage/kv/KeyValueStore.java | 14 +++
.../operators/util/InternalInMemoryStore.java | 6 ++
.../operators/impl/store/TestInMemoryStore.java | 12 +++
.../kv/inmemory/InMemoryKeyValueStore.scala | 12 ++-
.../kv/inmemory/TestInMemoryKeyValueStore.java | 81 ++++++++++++++++++
.../samza/storage/kv/RocksDbKeyValueStore.scala | 33 +++++++
.../kv/TestRocksDbKeyValueStoreJava.java | 90 ++++++++++++++++++++
.../samza/storage/kv/AccessLoggedStore.scala | 9 +-
.../apache/samza/storage/kv/CachedStore.scala | 4 +
.../storage/kv/KeyValueStorageEngine.scala | 7 ++
.../kv/KeyValueStorageEngineMetrics.scala | 2 +
.../apache/samza/storage/kv/LoggedStore.scala | 3 +
.../storage/kv/NullSafeKeyValueStore.scala | 6 ++
.../storage/kv/SerializedKeyValueStore.scala | 11 +++
.../samza/storage/kv/MockKeyValueStore.scala | 4 +
16 files changed, 316 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
new file mode 100644
index 0000000..8fd00ed
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueIterable.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+public interface KeyValueIterable<K, V> extends Iterable<Entry<K, V>> {
+ KeyValueIterator<K, V> iterator();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 18a89ec..3f216bd 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -111,6 +111,20 @@ public interface KeyValueStore<K, V> {
KeyValueIterator<K, V> range(K from, K to);
/**
+ * Returns an iterable for a sorted range of entries specified by [{@code from}, {@code to}).
+ * Note that we snapshot the iterator when the iterable is created from this function, and
+ * the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time.
+ *
+ * @param from the key specifying the low endpoint (inclusive) of the keys in the returned range.
+ * @param to the key specifying the high endpoint (exclusive) of the keys in the returned range.
+ * @return an iterable for the specified key range.
+ * @throws NullPointerException if null is used for {@code from} or {@code to}.
+ */
+ default KeyValueIterable<K, V> iterate(K from, K to) {
+ return () -> range(from, to);
+ }
+
+ /**
* Returns an iterator for all entries in this key-value store.
*
* <p><b>API Note:</b> The returned iterator MUST be closed after use.</p>
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index b8672c6..7360474 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -20,6 +20,7 @@
package org.apache.samza.operators.util;
import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterable;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -94,6 +95,11 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public KeyValueIterable<K, V> iterate(K from, K to) {
+ throw new UnsupportedOperationException("iterate() is not supported in " + InternalInMemoryStore.class.getName());
+ }
+
+ @Override
public KeyValueIterator<K, V> all() {
final Iterator<Map.Entry<K, V>> iterator = map.entrySet().iterator();
return new KeyValueIterator<K, V>() {
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index d16954d..e331703 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators.impl.store;
import com.google.common.primitives.UnsignedBytes;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterable;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -99,6 +100,17 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public KeyValueIterable<K, V> iterate(K from, K to) {
+ final ConcurrentNavigableMap<byte[], byte[]> values = map.subMap(keySerde.toBytes(from), keySerde.toBytes(to));
+ return new KeyValueIterable<K, V>() {
+ @Override
+ public KeyValueIterator<K, V> iterator() {
+ return new InMemoryIterator<>(values.entrySet().iterator(), keySerde, valSerde);
+ }
+ };
+ }
+
+ @Override
public KeyValueIterator<K, V> all() {
return new InMemoryIterator(map.entrySet().iterator(), keySerde, valSerde);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 7b83163..decaee0 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -20,7 +20,7 @@ package org.apache.samza.storage.kv.inmemory
import com.google.common.primitives.UnsignedBytes
import org.apache.samza.util.Logging
-import org.apache.samza.storage.kv.{KeyValueStoreMetrics, KeyValueIterator, Entry, KeyValueStore}
+import org.apache.samza.storage.kv._
import java.util
/**
@@ -112,4 +112,14 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
}
found
}
+
+ override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
+ // snapshot the iterable
+ val entries = underlying.subMap(from, to).entrySet()
+ new KeyValueIterable[Array[Byte], Array[Byte]] {
+ override def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ new InMemoryIterator(entries.iterator())
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
new file mode 100644
index 0000000..0fa5807
--- /dev/null
+++ b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryKeyValueStore.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory;
+
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueIterable;
+import org.apache.samza.storage.kv.KeyValueStoreMetrics;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestInMemoryKeyValueStore {
+ @Test
+ public void testIterate() throws Exception {
+ InMemoryKeyValueStore store = new InMemoryKeyValueStore(
+ new KeyValueStoreMetrics("testInMemory", new MetricsRegistryMap()));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ String prefix = "prefix";
+ for(int i = 0; i < 100; i++) {
+ store.put(genKey(outputStream, prefix, i), genValue());
+ }
+
+ byte[] firstKey = genKey(outputStream, prefix, 0);
+ byte[] lastKey = genKey(outputStream, prefix, 100);
+ KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+ // Make sure the cached Iterable won't change when new elements are added
+ store.put(genKey(outputStream, prefix, 200), genValue());
+ assertTrue(Iterators.size(iterable.iterator()) == 100);
+
+ List<Integer> keys = new ArrayList<>();
+ for (Entry<byte[], byte[]> entry : iterable) {
+ int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
+ keys.add(key);
+ }
+ assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList()));
+
+ outputStream.close();
+ store.close();
+ }
+
+ private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
+ outputStream.reset();
+ outputStream.write(prefix.getBytes());
+ outputStream.write(Ints.toByteArray(i));
+ return outputStream.toByteArray();
+ }
+
+ private byte[] genValue() {
+ int randomVal = ThreadLocalRandom.current().nextInt(0, 100000);
+ return Ints.toByteArray(randomVal);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 856cc4e..06fb584 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,6 +20,7 @@
package org.apache.samza.storage.kv
import java.io.File
+import java.util
import java.util.Comparator
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
@@ -203,6 +204,30 @@ class RocksDbKeyValueStore(
new RocksDbIterator(iter)
}
+ def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
+ //snapshot the iterator
+ val snapshotIter : RocksDbRangeIterator = range(from, to).asInstanceOf[RocksDbRangeIterator]
+ new KeyValueIterable[Array[Byte], Array[Byte]] {
+ var iter:RocksDbRangeIterator = null
+
+ def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
+ this.synchronized {
+ if (iter == null) {
+ iter = snapshotIter
+ iter
+ } else if(iter.isOpen() && !iter.hasNext()) {
+ // use the cached iterator and reset the position to the beginning
+ iter.seek(from)
+ iter
+ } else {
+ // we need to create a new iterator since the cached one is still in use or already closed
+ range(from, to)
+ }
+ }
+ }
+ }
+ }
+
def flush(): Unit = ifOpen {
metrics.flushes.inc
trace("Flushing store: %s" format storeName)
@@ -248,6 +273,10 @@ class RocksDbKeyValueStore(
iter.close()
}
+ def isOpen() = ifOpen {
+ open
+ }
+
override def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove")
override def hasNext() = ifOpen(iter.isValid)
@@ -301,6 +330,10 @@ class RocksDbKeyValueStore(
override def hasNext() = ifOpen {
super.hasNext() && comparator.compare(peekKey(), to) < 0
}
+
+ def seek(key: Array[Byte]) = {
+ iter.seek(key)
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
new file mode 100644
index 0000000..98688c6
--- /dev/null
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbKeyValueStoreJava.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.junit.Test;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.WriteOptions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestRocksDbKeyValueStoreJava {
+ @Test
+ public void testIterate() throws Exception {
+ Config config = new MapConfig();
+ Options options = new Options();
+ options.setCreateIfMissing(true);
+
+ File dbDir = new File(System.getProperty("java.io.tmpdir") + "/dbStore" + System.currentTimeMillis());
+ RocksDbKeyValueStore store = new RocksDbKeyValueStore(dbDir, options, config, false, "dbStore",
+ new WriteOptions(), new FlushOptions(), new KeyValueStoreMetrics("dbStore", new MetricsRegistryMap()));
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ String prefix = "prefix";
+ for(int i = 0; i < 100; i++) {
+ store.put(genKey(outputStream, prefix, i), genValue());
+ }
+
+ byte[] firstKey = genKey(outputStream, prefix, 0);
+ byte[] lastKey = genKey(outputStream, prefix, 1000);
+ KeyValueIterable<byte[], byte[]> iterable = store.iterate(firstKey, lastKey);
+ // Make sure the cached Iterable won't change when new elements are added
+ store.put(genKey(outputStream, prefix, 200), genValue());
+ assertTrue(Iterators.size(iterable.iterator()) == 100);
+
+ List<Integer> keys = new ArrayList<>();
+ for (Entry<byte[], byte[]> entry : iterable) {
+ int key = Ints.fromByteArray(Arrays.copyOfRange(entry.getKey(), prefix.getBytes().length, entry.getKey().length));
+ keys.add(key);
+ }
+ assertEquals(keys, IntStream.rangeClosed(0, 99).boxed().collect(Collectors.toList()));
+
+ outputStream.close();
+ store.close();
+ }
+
+ private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
+ outputStream.reset();
+ outputStream.write(prefix.getBytes());
+ outputStream.write(Ints.toByteArray(i));
+ return outputStream.toByteArray();
+ }
+
+ private byte[] genValue() {
+ int randomVal = ThreadLocalRandom.current().nextInt(0, 100000);
+ return Ints.toByteArray(randomVal);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index 879a144..67fd011 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -41,6 +41,7 @@ class AccessLoggedStore[K, V](
val WRITE = 2
val DELETE = 3
val RANGE = 4
+ val ITERATE = 5
}
val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream)
@@ -91,6 +92,13 @@ class AccessLoggedStore[K, V](
store.all()
}
+ def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ val list : util.ArrayList[K] = new util.ArrayList[K]()
+ list.add(from)
+ list.add(to)
+ logAccess(DBOperation.ITERATE, serializeKeys(list), store.iterate(from, to))
+ }
+
def close(): Unit = {
trace("Closing accessLogged store.")
@@ -151,5 +159,4 @@ class AccessLoggedStore[K, V](
val bytes = keySerde.toBytes(key)
bytes
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index d40999a..29efacb 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -286,6 +286,10 @@ class CachedStore[K, V](
}
def hasArrayKeys = containsArrayKeys
+
+ override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ store.iterate(from, to)
+ }
}
private case class CacheEntry[K, V](var value: V, var dirty: mutable.DoubleLinkedList[K])
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 5f7bbd8..b055ca5 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -160,4 +160,11 @@ class KeyValueStorageEngine[K, V](
}
override def getStoreProperties: StoreProperties = storeProperties
+
+ override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ updateTimer(metrics.iterateNs) {
+ metrics.iterates.inc
+ wrapperStore.iterate(from, to)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 8c42c7c..4162292 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -33,6 +33,7 @@ class KeyValueStorageEngineMetrics(
val puts = newCounter("puts")
val deletes = newCounter("deletes")
val flushes = newCounter("flushes")
+ val iterates = newCounter("iterates")
val restoredMessages = newCounter("messages-restored") //Deprecated
val restoredMessagesGauge = newGauge("restored-messages", 0)
@@ -47,6 +48,7 @@ class KeyValueStorageEngineMetrics(
val flushNs = newTimer("flush-ns")
val allNs = newTimer("all-ns")
val rangeNs = newTimer("range-ns")
+ val iterateNs = newTimer("iterate-ns")
override def getPrefix = storeName + "-"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index e0c7a31..0d013f8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -114,4 +114,7 @@ class LoggedStore[K, V](
store.close
}
+ override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ store.iterate(from, to)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 7adffa9..1978710 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -89,4 +89,10 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
throw new NullPointerException(msg)
}
}
+
+ override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ notNull(from, NullKeyErrorMessage)
+ notNull(to, NullKeyErrorMessage)
+ store.iterate(from, to)
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 16dd980..5f59143 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -148,4 +148,15 @@ class SerializedKeyValueStore[K, V](
}
bytes
}
+
+ override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
+ val fromBytes = toBytesOrNull(from, keySerde)
+ val toBytes = toBytesOrNull(to, keySerde)
+ val iterable = store.iterate(fromBytes, toBytes)
+ new KeyValueIterable[K, V] {
+ override def iterator(): KeyValueIterator[K, V] = {
+ new DeserializingIterator(iterable.iterator())
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fa49e722/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index f66dc04..8affd5e 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -69,4 +69,8 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
override def flush() {} // no-op
override def close() { kvMap.clear() }
+
+ override def iterate(from: String, to: String): KeyValueIterable[String, String] = {
+ throw new UnsupportedOperationException("iterator() not supported")
+ }
}
\ No newline at end of file