You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/29 21:46:07 UTC
kafka git commit: KAFKA-3133: Add putIfAbsent function to
KeyValueStore
Repository: kafka
Updated Branches:
refs/heads/trunk 92c35230f -> d501cc62d
KAFKA-3133: Add putIfAbsent function to KeyValueStore
guozhangwang
Author: Kim Christensen <ki...@mvno.dk>
Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>
Closes #912 from kichristensen/KAFKA-3133
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d501cc62
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d501cc62
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d501cc62
Branch: refs/heads/trunk
Commit: d501cc62ddcbbddcb114a40527d3e4f71f501286
Parents: 92c3523
Author: Kim Christensen <ki...@mvno.dk>
Authored: Mon Feb 29 12:46:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 29 12:46:03 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/state/KeyValueStore.java | 11 ++++++
.../internals/InMemoryKeyValueLoggedStore.java | 10 ++++++
.../InMemoryKeyValueStoreSupplier.java | 9 +++++
.../streams/state/internals/MemoryLRUCache.java | 9 +++++
.../state/internals/MeteredKeyValueStore.java | 12 +++++++
.../streams/state/internals/RocksDBStore.java | 9 +++++
.../internals/AbstractKeyValueStoreTest.java | 37 ++++++++++++++++++++
7 files changed, 97 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index d448044..f296230 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -51,6 +51,17 @@ public interface KeyValueStore<K, V> extends StateStore {
abstract public void put(K key, V value);
/**
+ * Update the value associated with this key, unless a value
+ * is already associated with the key
+ *
+ * @param key They key to associate the value to
+ * @param value The value
+ * @return The old value or null if there is no such key.
+ * @throws NullPointerException If null is used for key or value.
+ */
+ abstract public V putIfAbsent(K key, V value);
+
+ /**
* Update all the given key/value pairs
*
* @param entries A list of entries to put into the store.
http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 596cc2b..d25faa8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -79,6 +79,16 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public V putIfAbsent(K key, V value) {
+ V originalValue = this.inner.putIfAbsent(key, value);
+ if (originalValue == null) {
+ changeLogger.add(key);
+ changeLogger.maybeLogChange(this.getter);
+ }
+ return originalValue;
+ }
+
+ @Override
public void putAll(List<KeyValue<K, V>> entries) {
this.inner.putAll(entries);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 0665af2..b96a103 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -117,6 +117,15 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
+ public V putIfAbsent(K key, V value) {
+ V originalValue = get(key);
+ if (originalValue == null) {
+ put(key, value);
+ }
+ return originalValue;
+ }
+
+ @Override
public void putAll(List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries)
put(entry.key, entry.value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 2a8be8c..bd03f03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -118,6 +118,15 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public V putIfAbsent(K key, V value) {
+ V originalValue = get(key);
+ if (originalValue == null) {
+ put(key, value);
+ }
+ return originalValue;
+ }
+
+ @Override
public void putAll(List<KeyValue<K, V>> entries) {
for (KeyValue<K, V> entry : entries)
put(entry.key, entry.value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 46feb58..9808c04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -43,6 +43,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final Time time;
private Sensor putTime;
+ private Sensor putIfAbsentTime;
private Sensor getTime;
private Sensor deleteTime;
private Sensor putAllTime;
@@ -69,6 +70,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
final String name = name();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+ this.putIfAbsentTime = this.metrics.addLatencySensor(metricScope, name, "put-if-absent");
this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
@@ -112,6 +114,16 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public V putIfAbsent(K key, V value) {
+ long startNs = time.nanoseconds();
+ try {
+ return this.inner.putIfAbsent(key, value);
+ } finally {
+ this.metrics.recordLatency(this.putIfAbsentTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
public void putAll(List<KeyValue<K, V>> entries) {
long startNs = time.nanoseconds();
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 999c9ec..c295aea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -256,6 +256,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
}
+ @Override
+ public V putIfAbsent(K key, V value) {
+ V originalValue = get(key);
+ if (originalValue == null) {
+ put(key, value);
+ }
+ return originalValue;
+ }
+
private void putInternal(byte[] rawKey, byte[] rawValue) {
if (rawValue == null) {
try {
http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index ee343e8..fb0efc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -192,4 +192,41 @@ public abstract class AbstractKeyValueStoreTest {
}
}
+
+
+ @Test
+ public void testPutIfAbsent() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ try {
+
+ // Verify that the store reads and writes correctly ...
+ assertNull(store.putIfAbsent(0, "zero"));
+ assertNull(store.putIfAbsent(1, "one"));
+ assertNull(store.putIfAbsent(2, "two"));
+ assertNull(store.putIfAbsent(4, "four"));
+ assertEquals("four", store.putIfAbsent(4, "unexpected value"));
+ assertEquals(4, driver.sizeOf(store));
+ assertEquals("zero", store.get(0));
+ assertEquals("one", store.get(1));
+ assertEquals("two", store.get(2));
+ assertNull(store.get(3));
+ assertEquals("four", store.get(4));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+ assertEquals("zero", driver.flushedEntryStored(0));
+ assertEquals("one", driver.flushedEntryStored(1));
+ assertEquals("two", driver.flushedEntryStored(2));
+ assertEquals("four", driver.flushedEntryStored(4));
+
+ assertEquals(false, driver.flushedEntryRemoved(0));
+ assertEquals(false, driver.flushedEntryRemoved(1));
+ assertEquals(false, driver.flushedEntryRemoved(2));
+ assertEquals(false, driver.flushedEntryRemoved(4));
+ } finally {
+ store.close();
+ }
+ }
}