You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/29 21:46:07 UTC

kafka git commit: KAFKA-3133: Add putIfAbsent function to KeyValueStore

Repository: kafka
Updated Branches:
  refs/heads/trunk 92c35230f -> d501cc62d


KAFKA-3133: Add putIfAbsent function to KeyValueStore

guozhangwang

Author: Kim Christensen <ki...@mvno.dk>

Reviewers: Ismael Juma <is...@juma.me.uk>, Guozhang Wang <wa...@gmail.com>

Closes #912 from kichristensen/KAFKA-3133


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d501cc62
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d501cc62
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d501cc62

Branch: refs/heads/trunk
Commit: d501cc62ddcbbddcb114a40527d3e4f71f501286
Parents: 92c3523
Author: Kim Christensen <ki...@mvno.dk>
Authored: Mon Feb 29 12:46:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 29 12:46:03 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/state/KeyValueStore.java      | 11 ++++++
 .../internals/InMemoryKeyValueLoggedStore.java  | 10 ++++++
 .../InMemoryKeyValueStoreSupplier.java          |  9 +++++
 .../streams/state/internals/MemoryLRUCache.java |  9 +++++
 .../state/internals/MeteredKeyValueStore.java   | 12 +++++++
 .../streams/state/internals/RocksDBStore.java   |  9 +++++
 .../internals/AbstractKeyValueStoreTest.java    | 37 ++++++++++++++++++++
 7 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index d448044..f296230 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -51,6 +51,17 @@ public interface KeyValueStore<K, V> extends StateStore {
     abstract public void put(K key, V value);
 
     /**
+     * Update the value associated with this key, unless a value
+     * is already associated with the key
+     *
+     * @param key They key to associate the value to
+     * @param value The value
+     * @return The old value or null if there is no such key.
+     * @throws NullPointerException If null is used for key or value.
+     */
+    abstract public V putIfAbsent(K key, V value);
+
+    /**
      * Update all the given key/value pairs
      *
      * @param entries A list of entries to put into the store.

http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 596cc2b..d25faa8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -79,6 +79,16 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public V putIfAbsent(K key, V value) {
+        V originalValue = this.inner.putIfAbsent(key, value);
+        if (originalValue == null) {
+            changeLogger.add(key);
+            changeLogger.maybeLogChange(this.getter);
+        }
+        return originalValue;
+    }
+
+    @Override
     public void putAll(List<KeyValue<K, V>> entries) {
         this.inner.putAll(entries);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 0665af2..b96a103 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -117,6 +117,15 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
+        public V putIfAbsent(K key, V value) {
+            V originalValue = get(key);
+            if (originalValue == null) {
+                put(key, value);
+            }
+            return originalValue;
+        }
+
+        @Override
         public void putAll(List<KeyValue<K, V>> entries) {
             for (KeyValue<K, V> entry : entries)
                 put(entry.key, entry.value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 2a8be8c..bd03f03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -118,6 +118,15 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public V putIfAbsent(K key, V value) {
+        V originalValue = get(key);
+        if (originalValue == null) {
+            put(key, value);
+        }
+        return originalValue;
+    }
+
+    @Override
     public void putAll(List<KeyValue<K, V>> entries) {
         for (KeyValue<K, V> entry : entries)
             put(entry.key, entry.value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 46feb58..9808c04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -43,6 +43,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     protected final Time time;
 
     private Sensor putTime;
+    private Sensor putIfAbsentTime;
     private Sensor getTime;
     private Sensor deleteTime;
     private Sensor putAllTime;
@@ -69,6 +70,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         final String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+        this.putIfAbsentTime = this.metrics.addLatencySensor(metricScope, name, "put-if-absent");
         this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
         this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
         this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
@@ -112,6 +114,16 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public V putIfAbsent(K key, V value) {
+        long startNs = time.nanoseconds();
+        try {
+            return this.inner.putIfAbsent(key, value);
+        } finally {
+            this.metrics.recordLatency(this.putIfAbsentTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
     public void putAll(List<KeyValue<K, V>> entries) {
         long startNs = time.nanoseconds();
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 999c9ec..c295aea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -256,6 +256,15 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
+    @Override
+    public V putIfAbsent(K key, V value) {
+        V originalValue = get(key);
+        if (originalValue == null) {
+            put(key, value);
+        }
+        return originalValue;
+    }
+
     private void putInternal(byte[] rawKey, byte[] rawValue) {
         if (rawValue == null) {
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d501cc62/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index ee343e8..fb0efc9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -192,4 +192,41 @@ public abstract class AbstractKeyValueStoreTest {
         }
     }
 
+
+
+    @Test
+    public void testPutIfAbsent() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        try {
+
+            // Verify that the store reads and writes correctly ...
+            assertNull(store.putIfAbsent(0, "zero"));
+            assertNull(store.putIfAbsent(1, "one"));
+            assertNull(store.putIfAbsent(2, "two"));
+            assertNull(store.putIfAbsent(4, "four"));
+            assertEquals("four", store.putIfAbsent(4, "unexpected value"));
+            assertEquals(4, driver.sizeOf(store));
+            assertEquals("zero", store.get(0));
+            assertEquals("one", store.get(1));
+            assertEquals("two", store.get(2));
+            assertNull(store.get(3));
+            assertEquals("four", store.get(4));
+
+            // Flush the store and verify all current entries were properly flushed ...
+            store.flush();
+            assertEquals("zero", driver.flushedEntryStored(0));
+            assertEquals("one", driver.flushedEntryStored(1));
+            assertEquals("two", driver.flushedEntryStored(2));
+            assertEquals("four", driver.flushedEntryStored(4));
+
+            assertEquals(false, driver.flushedEntryRemoved(0));
+            assertEquals(false, driver.flushedEntryRemoved(1));
+            assertEquals(false, driver.flushedEntryRemoved(2));
+            assertEquals(false, driver.flushedEntryRemoved(4));
+        } finally {
+            store.close();
+        }
+    }
 }