You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/26 16:32:26 UTC

kafka git commit: KAFKA-3499: prevent array typed keys in KeyValueStore

Repository: kafka
Updated Branches:
  refs/heads/trunk e7d04c251 -> a02c8aaec


KAFKA-3499: prevent array typed keys in KeyValueStore

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Ismael Juma, Josh Gruenberg, Michael G. Noll, Ewen Cheslack-Postava

Closes #1229 from guozhangwang/K3499


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a02c8aae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a02c8aae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a02c8aae

Branch: refs/heads/trunk
Commit: a02c8aaecfbd13838c2a062bac1455da352028fe
Parents: e7d04c2
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Apr 26 07:32:21 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 26 07:32:21 2016 -0700

----------------------------------------------------------------------
 .../common/serialization/BytesDeserializer.java |  35 ++++
 .../common/serialization/BytesSerializer.java   |  36 ++++
 .../kafka/common/serialization/Serdes.java      |  39 +++-
 .../org/apache/kafka/common/utils/Bytes.java    | 178 +++++++++++++++++++
 .../kafka/streams/state/WindowStoreUtils.java   |  10 +-
 .../InMemoryKeyValueStoreSupplier.java          |  13 +-
 .../streams/state/internals/MemoryLRUCache.java |  13 ++
 .../state/internals/MeteredKeyValueStore.java   |   8 +-
 .../state/internals/RawStoreChangeLogger.java   |  56 ------
 .../streams/state/internals/RocksDBStore.java   |  73 ++++----
 .../state/internals/RocksDBWindowStore.java     |  48 ++---
 .../state/internals/StoreChangeLogger.java      |  10 ++
 .../streams/state/KeyValueStoreTestDriver.java  |  15 +-
 .../internals/AbstractKeyValueStoreTest.java    |   2 -
 .../state/internals/StoreChangeLoggerTest.java  |  41 -----
 15 files changed, 386 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
new file mode 100644
index 0000000..ee6a57c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Map;
+
+public class BytesDeserializer implements Deserializer<Bytes> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public Bytes deserialize(String topic, byte[] data) {
+        if (data == null)
+            return null;
+
+        return new Bytes(data);
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
new file mode 100644
index 0000000..3d04446
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Map;
+
+public class BytesSerializer implements Serializer<Bytes> {
+
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        // nothing to do
+    }
+
+    public byte[] serialize(String topic, Bytes data) {
+        if (data == null)
+            return null;
+
+        return data.get();
+    }
+
+    public void close() {
+        // nothing to do
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index f27f74f..d744522 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -13,6 +13,8 @@
 
 package org.apache.kafka.common.serialization;
 
+import org.apache.kafka.common.utils.Bytes;
+
 import java.nio.ByteBuffer;
 
 /**
@@ -80,6 +82,18 @@ public class Serdes {
         }
     }
 
+    static public final class BytesSerde implements Serde<Bytes> {
+        @Override
+        public Serializer<Bytes> serializer() {
+            return new BytesSerializer();
+        }
+
+        @Override
+        public Deserializer<Bytes> deserializer() {
+            return new BytesDeserializer();
+        }
+    }
+
     static public final class ByteArraySerde implements Serde<byte[]> {
         @Override
         public Serializer<byte[]> serializer() {
@@ -114,10 +128,14 @@ public class Serdes {
             return (Serde<T>) ByteArray();
         }
 
-        if (ByteBufferSerde.class.isAssignableFrom(type)) {
+        if (ByteBuffer.class.isAssignableFrom(type)) {
             return (Serde<T>) ByteBuffer();
         }
 
+        if (Bytes.class.isAssignableFrom(type)) {
+            return (Serde<T>) Bytes();
+        }
+
         // TODO: we can also serializes objects of type T using generic Java serialization by default
         throw new IllegalArgumentException("Unknown class for built-in serializer");
     }
@@ -150,42 +168,49 @@ public class Serdes {
     }
 
     /*
-     * A serde for nullable long type.
+     * A serde for nullable {@code Long} type.
      */
     static public Serde<Long> Long() {
         return new LongSerde();
     }
 
     /*
-     * A serde for nullable int type.
+     * A serde for nullable {@code Integer} type.
      */
     static public Serde<Integer> Integer() {
         return new IntegerSerde();
     }
 
     /*
-     * A serde for nullable long type.
+     * A serde for nullable {@code Double} type.
      */
     static public Serde<Double> Double() {
         return new DoubleSerde();
     }
 
     /*
-     * A serde for nullable string type.
+     * A serde for nullable {@code String} type.
      */
     static public Serde<String> String() {
         return new StringSerde();
     }
 
     /*
-     * A serde for nullable byte array type.
+     * A serde for nullable {@code ByteBuffer} type.
      */
     static public Serde<ByteBuffer> ByteBuffer() {
         return new ByteBufferSerde();
     }
 
     /*
-     * A serde for nullable byte array type.
+     * A serde for nullable {@code Bytes} type.
+     */
+    static public Serde<Bytes> Bytes() {
+        return new BytesSerde();
+    }
+
+    /*
+     * A serde for nullable {@code byte[]} type.
      */
     static public Serde<byte[]> ByteArray() {
         return new ByteArraySerde();

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
new file mode 100644
index 0000000..78340e5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * Utility class that handles immutable byte arrays.
+ */
+public class Bytes implements Comparable<Bytes> {
+
+    private static final char[] HEX_CHARS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+
+    private final byte[] bytes;
+
+    // cache the hash code for the string, default to 0
+    private int hashCode;
+
+    public static Bytes wrap(byte[] bytes) {
+        return new Bytes(bytes);
+    }
+
+    /**
+     * Create a Bytes using the byte array.
+     *
+     * @param bytes This array becomes the backing storage for the object.
+     */
+    public Bytes(byte[] bytes) {
+        this.bytes = bytes;
+
+        // initialize hash code to 0
+        hashCode = 0;
+    }
+
+    /**
+     * Get the data from the Bytes.
+     * @return The data is only valid between offset and offset+length.
+     */
+    public byte[] get() {
+        return this.bytes;
+    }
+
+    /**
+     * The hashcode is cached except for the case where it is computed as 0, in which
+     * case we compute the hashcode on every call.
+     *
+     * @return the hashcode
+     */
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = Arrays.hashCode(bytes);
+        }
+
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other)
+            return true;
+
+        // we intentionally use the function to compute hashcode here
+        if (this.hashCode() != other.hashCode())
+            return false;
+
+        if (other instanceof Bytes)
+            return Arrays.equals(this.bytes, ((Bytes) other).get());
+
+        return false;
+    }
+
+    @Override
+    public int compareTo(Bytes that) {
+        return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes);
+    }
+
+    @Override
+    public String toString() {
+        return Bytes.toString(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Write a printable representation of a byte array. Non-printable
+     * characters are hex escaped in the format \\x%02X, eg:
+     * \x00 \x05 etc.
+     *
+     * This function is brought from org.apache.hadoop.hbase.util.Bytes
+     *
+     * @param b array to write out
+     * @param off offset to start at
+     * @param len length to write
+     * @return string output
+     */
+    private static String toString(final byte[] b, int off, int len) {
+        StringBuilder result = new StringBuilder();
+
+        if (b == null)
+            return result.toString();
+
+        // just in case we are passed a 'len' that is > buffer length...
+        if (off >= b.length)
+            return result.toString();
+
+        if (off + len > b.length)
+            len = b.length - off;
+
+        for (int i = off; i < off + len; ++i) {
+            int ch = b[i] & 0xFF;
+            if (ch >= ' ' && ch <= '~' && ch != '\\') {
+                result.append((char) ch);
+            } else {
+                result.append("\\x");
+                result.append(HEX_CHARS_UPPER[ch / 0x10]);
+                result.append(HEX_CHARS_UPPER[ch % 0x10]);
+            }
+        }
+        return result.toString();
+    }
+
+    /**
+     * A byte array comparator based on lexicograpic ordering.
+     */
+    public final static Comparator<byte[]> BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
+
+    private interface ByteArrayComparator extends Comparator<byte[]> {
+
+        int compare(final byte[] buffer1, int offset1, int length1,
+                    final byte[] buffer2, int offset2, int length2);
+    }
+
+    private static class LexicographicByteArrayComparator implements ByteArrayComparator {
+
+        @Override
+        public int compare(byte[] buffer1, byte[] buffer2) {
+            return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length);
+        }
+
+        public int compare(final byte[] buffer1, int offset1, int length1,
+                           final byte[] buffer2, int offset2, int length2) {
+
+            // short circuit equal case
+            if (buffer1 == buffer2 &&
+                    offset1 == offset2 &&
+                    length1 == length2) {
+                return 0;
+            }
+
+            // similar to Arrays.compare() but considers offset and length
+            int end1 = offset1 + length1;
+            int end2 = offset2 + length2;
+            for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
+                int a = buffer1[i] & 0xff;
+                int b = buffer2[j] & 0xff;
+                if (a != b) {
+                    return a - b;
+                }
+            }
+            return length1 - length2;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
index fdf3269..2f99ad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
@@ -21,6 +21,7 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 
 import java.nio.ByteBuffer;
 
@@ -30,13 +31,12 @@ public class WindowStoreUtils {
     private static final int TIMESTAMP_SIZE = 8;
 
     /** Inner byte array serde used for segments */
-    public static final Serde<byte[]> INNER_SERDE = Serdes.ByteArray();
-
-    /** Inner byte array state serde used for segments */
-    public static final StateSerdes<byte[], byte[]> INNER_SERDES = new StateSerdes<>("", INNER_SERDE, INNER_SERDE);
+    public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
+    public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
+    public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
 
     @SuppressWarnings("unchecked")
-    public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
+    public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes, byte[]>[]) new KeyValueIterator[0];
 
     public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
         byte[] serializedKey = serdes.rawKey(key);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 3a5819c..a25153c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -35,6 +35,10 @@ import java.util.TreeMap;
 /**
  * An in-memory key-value store based on a TreeMap.
  *
+ * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+ *
  * @param <K> The key type
  * @param <V> The value type
  *
@@ -63,7 +67,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time);
+        return new MeteredKeyValueStore<>(new MemoryStore<>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time);
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -76,6 +80,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
             this.name = name;
             this.keySerde = keySerde;
             this.valueSerde = valueSerde;
+
+            // TODO: when we have serde associated with class types, we can
+            // improve this situation by passing the comparator here.
             this.map = new TreeMap<>();
         }
 
@@ -131,12 +138,12 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
         @Override
         public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+            return new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator());
         }
 
         @Override
         public KeyValueIterator<K, V> all() {
-            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+            return new MemoryStoreIterator<>(this.map.entrySet().iterator());
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 76dd744..d410e02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -29,6 +29,19 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * An in-memory LRU cache store based on HashSet and HashMap.
+ *
+ *  * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
 public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
 
     public interface EldestEntryRemovalListener<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 9808c04..5e5b54a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -137,9 +137,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     public V delete(K key) {
         long startNs = time.nanoseconds();
         try {
-            V value = this.inner.delete(key);
-
-            return value;
+            return this.inner.delete(key);
         } finally {
             this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
         }
@@ -147,12 +145,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
-        return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
+        return new MeteredKeyValueIterator<>(this.inner.range(from, to), this.rangeTime);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
+        return new MeteredKeyValueIterator<>(this.inner.all(), this.allTime);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
deleted file mode 100644
index 4d99b59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.WindowStoreUtils;
-
-import java.util.Comparator;
-import java.util.TreeSet;
-
-public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> {
-
-    private class ByteArrayComparator implements Comparator<byte[]> {
-        @Override
-        public int compare(byte[] left, byte[] right) {
-            for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                int a = left[i] & 0xff;
-                int b = right[j] & 0xff;
-
-                if (a != b)
-                    return a - b;
-            }
-            return left.length - right.length;
-        }
-    }
-
-    public RawStoreChangeLogger(String storeName, ProcessorContext context) {
-        this(storeName, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
-    }
-
-    public RawStoreChangeLogger(String storeName, ProcessorContext context, int maxDirty, int maxRemoved) {
-        super(storeName, context, context.taskId().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
-        init();
-    }
-
-    @Override
-    public void init() {
-        this.dirty = new TreeSet<>(new ByteArrayComparator());
-        this.removed = new TreeSet<>(new ByteArrayComparator());
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 944d408..3fef0ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 
+import org.apache.kafka.streams.state.WindowStoreUtils;
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
@@ -46,6 +48,18 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Set;
 
+/**
+ * A persistent key-value store based on RocksDB.
+ *
+ * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
 public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private static final int TTL_NOT_USED = -1;
@@ -80,8 +94,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private Set<K> cacheDirtyKeys;
     private MemoryLRUCache<K, RocksDBCacheEntry> cache;
-    private StoreChangeLogger<byte[], byte[]> changeLogger;
-    private StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+    private StoreChangeLogger<Bytes, byte[]> changeLogger;
+    private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
 
     public KeyValueStore<K, V> enableLogging() {
         loggingEnabled = true;
@@ -156,7 +170,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         // open the DB dir
         openDB(context);
 
-        this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
+        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
 
         if (this.cacheSize > 0) {
             this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
@@ -170,7 +184,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                         }
                     });
 
-
             this.cacheDirtyKeys = new HashSet<>();
         } else {
             this.cache = null;
@@ -179,10 +192,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         // value getter should always read directly from rocksDB
         // since it is only for values that are already flushed
-        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+        this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
             @Override
-            public byte[] get(byte[] key) {
-                return getInternal(key);
+            public byte[] get(Bytes key) {
+                return getInternal(key.get());
             }
         };
 
@@ -258,7 +271,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             putInternal(rawKey, rawValue);
 
             if (loggingEnabled) {
-                changeLogger.add(rawKey);
+                changeLogger.add(Bytes.wrap(rawKey));
                 changeLogger.maybeLogChange(this.getter);
             }
         }
@@ -325,7 +338,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         if (cache != null)
             flushCache();
 
-        return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+        return new RocksDBRangeIterator<>(db.newIterator(), serdes, from, to);
     }
 
     @Override
@@ -336,7 +349,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
         RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
-        return new RocksDbIterator<K, V>(innerIter, serdes);
+        return new RocksDbIterator<>(innerIter, serdes);
     }
 
     private void flushCache() {
@@ -348,14 +361,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             for (K key : cacheDirtyKeys) {
                 RocksDBCacheEntry entry = cache.get(key);
 
-                entry.isDirty = false;
+                if (entry != null) {
+                    entry.isDirty = false;
 
-                byte[] rawKey = serdes.rawKey(key);
+                    byte[] rawKey = serdes.rawKey(key);
 
-                if (entry.value != null) {
-                    putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value)));
-                } else {
-                    deleteBatch.add(rawKey);
+                    if (entry.value != null) {
+                        putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value)));
+                    } else {
+                        deleteBatch.add(rawKey);
+                    }
                 }
             }
 
@@ -363,7 +378,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
             if (loggingEnabled) {
                 for (KeyValue<byte[], byte[]> kv : putBatch)
-                    changeLogger.add(kv.key);
+                    changeLogger.add(Bytes.wrap(kv.key));
             }
 
             // check all removed entries and remove them in rocksDB
@@ -376,7 +391,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 }
 
                 if (loggingEnabled) {
-                    changeLogger.delete(removedKey);
+                    changeLogger.delete(Bytes.wrap(removedKey));
                 }
             }
 
@@ -464,30 +479,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     }
 
-    private static class LexicographicComparator implements Comparator<byte[]> {
-
-        @Override
-        public int compare(byte[] left, byte[] right) {
-            for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                int leftByte = left[i] & 0xff;
-                int rightByte = right[j] & 0xff;
-                if (leftByte != rightByte) {
-                    return leftByte - rightByte;
-                }
-            }
-            return left.length - right.length;
-        }
-    }
-
     private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
         // RocksDB's JNI interface does not expose getters/setters that allow the
         // comparator to be pluggable, and the default is lexicographic, so it's
         // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = new LexicographicComparator();
-        byte[] rawToKey;
+        private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
+        private byte[] rawToKey;
 
-        public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes,
-                                    K from, K to) {
+        public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) {
             super(iter, serdes);
             iter.seek(serdes.rawKey(from));
             this.rawToKey = serdes.rawKey(to);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 5955d21..4c964c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,6 +20,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -31,7 +32,6 @@ import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.WindowStoreUtils;
 
-
 import java.io.File;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -48,11 +48,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     private static final long USE_CURRENT_TIMESTAMP = -1L;
 
-    private static class Segment extends RocksDBStore<byte[], byte[]> {
+    // use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
+    private static class Segment extends RocksDBStore<Bytes, byte[]> {
         public final long id;
 
         Segment(String segmentName, String windowName, long id) {
-            super(segmentName, windowName, WindowStoreUtils.INNER_SERDE, WindowStoreUtils.INNER_SERDE);
+            super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
             this.id = id;
         }
 
@@ -63,14 +64,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
     private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
         private final StateSerdes<?, V> serdes;
-        private final KeyValueIterator<byte[], byte[]>[] iterators;
+        private final KeyValueIterator<Bytes, byte[]>[] iterators;
         private int index = 0;
 
         RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
             this(serdes, WindowStoreUtils.NO_ITERATORS);
         }
 
-        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
+        RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<Bytes, byte[]>[] iterators) {
             this.serdes = serdes;
             this.iterators = iterators;
         }
@@ -94,9 +95,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             if (index >= iterators.length)
                 throw new NoSuchElementException();
 
-            KeyValue<byte[], byte[]> kv = iterators[index].next();
+            KeyValue<Bytes, byte[]> kv = iterators[index].next();
 
-            return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key),
+            return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key.get()),
                                   serdes.valueFrom(kv.value));
         }
 
@@ -108,7 +109,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         @Override
         public void close() {
-            for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
+            for (KeyValueIterator<Bytes, byte[]> iterator : iterators) {
                 iterator.close();
             }
         }
@@ -121,7 +122,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private final Serde<K> keySerde;
     private final Serde<V> valueSerde;
     private final SimpleDateFormat formatter;
-    private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+    private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
 
     private ProcessorContext context;
     private int seqnum = 0;
@@ -130,7 +131,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     private StateSerdes<K, V> serdes;
 
     private boolean loggingEnabled = false;
-    private StoreChangeLogger<byte[], byte[]> changeLogger = null;
+    private StoreChangeLogger<Bytes, byte[]> changeLogger = null;
 
     public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
         this.name = name;
@@ -144,9 +145,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         this.retainDuplicates = retainDuplicates;
 
-        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
-            public byte[] get(byte[] key) {
-                return getInternal(key);
+        this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
+            public byte[] get(Bytes key) {
+                return getInternal(key.get());
             }
         };
 
@@ -178,13 +179,16 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         openExistingSegments();
 
-        this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
+        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger(name, context, WindowStoreUtils.INNER_SERDES) : null;
 
         // register and possibly restore the state from the logs
         context.register(root, loggingEnabled, new StateRestoreCallback() {
             @Override
             public void restore(byte[] key, byte[] value) {
-                putInternal(key, value);
+                // if the value is null, it means that this record has already been
+                // deleted while it was captured in the changelog, hence we do not need to put any more.
+                if (value != null)
+                    putInternal(key, value);
             }
         });
 
@@ -249,7 +253,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         byte[] rawKey = putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
 
         if (rawKey != null && loggingEnabled) {
-            changeLogger.add(rawKey);
+            changeLogger.add(Bytes.wrap(rawKey));
             changeLogger.maybeLogChange(this.getter);
         }
     }
@@ -259,7 +263,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         byte[] rawKey = putAndReturnInternalKey(key, value, timestamp);
 
         if (rawKey != null && loggingEnabled) {
-            changeLogger.add(rawKey);
+            changeLogger.add(Bytes.wrap(rawKey));
             changeLogger.maybeLogChange(this.getter);
         }
     }
@@ -281,7 +285,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
             if (retainDuplicates)
                 seqnum = (seqnum + 1) & 0x7FFFFFFF;
             byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
-            segment.put(binaryKey, serdes.rawValue(value));
+            segment.put(Bytes.wrap(binaryKey), serdes.rawValue(value));
             return binaryKey;
         } else {
             return null;
@@ -300,7 +304,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         // If the record is within the retention period, put it in the store.
         Segment segment = getSegment(segmentId);
         if (segment != null)
-            segment.put(binaryKey, binaryValue);
+            segment.put(Bytes.wrap(binaryKey), binaryValue);
     }
 
     private byte[] getInternal(byte[] binaryKey) {
@@ -308,7 +312,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         Segment segment = getSegment(segmentId);
         if (segment != null) {
-            return segment.get(binaryKey);
+            return segment.get(Bytes.wrap(binaryKey));
         } else {
             return null;
         }
@@ -323,12 +327,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
         byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
         byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes);
 
-        ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
+        ArrayList<KeyValueIterator<Bytes, byte[]>> iterators = new ArrayList<>();
 
         for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
             Segment segment = getSegment(segmentId);
             if (segment != null)
-                iterators.add(segment.range(binaryFrom, binaryTo));
+                iterators.add(segment.range(Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo)));
         }
 
         if (iterators.size() > 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index a439117..3f848fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -27,6 +27,16 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.util.HashSet;
 import java.util.Set;
 
+/**
+ * Store change log collector that batches updates before sending to Kafka.
+ *
+ * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+ *
+ * @param <K>
+ * @param <V>
+ */
 public class StoreChangeLogger<K, V> {
 
     public interface ValueGetter<K, V> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 0468f49..3a35d75 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -198,19 +198,8 @@ public class KeyValueStoreTestDriver<K, V> {
             public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                 // for byte arrays we need to wrap it for comparison
 
-                K key;
-                if (record.key() instanceof byte[]) {
-                    key = serdes.keyFrom((byte[]) record.key());
-                } else {
-                    key = (K) record.key();
-                }
-
-                V value;
-                if (record.key() instanceof byte[]) {
-                    value = serdes.valueFrom((byte[]) record.value());
-                } else {
-                    value = (V) record.value();
-                }
+                K key = serdes.keyFrom(keySerializer.serialize(record.topic(), record.key()));
+                V value = serdes.valueFrom(valueSerializer.serialize(record.topic(), record.value()));
 
                 recordFlushed(key, value);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index fb0efc9..2bfe644 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -192,8 +192,6 @@ public abstract class AbstractKeyValueStoreTest {
         }
     }
 
-
-
     @Test
     public void testPutIfAbsent() {
         // Create the test driver ...

http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 9a477df..09f12fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -24,10 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -63,8 +60,6 @@ public class StoreChangeLoggerTest {
 
     private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
 
-    private final StoreChangeLogger<byte[], byte[]> rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3);
-
     private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() {
         @Override
         public String get(Integer key) {
@@ -72,16 +67,6 @@ public class StoreChangeLoggerTest {
         }
     };
 
-    private final StoreChangeLogger.ValueGetter<byte[], byte[]> rawGetter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
-        private IntegerDeserializer deserializer = new IntegerDeserializer();
-        private StringSerializer serializer = new StringSerializer();
-
-        @Override
-        public byte[] get(byte[] key) {
-            return serializer.serialize(topic, written.get(deserializer.deserialize(topic, key)));
-        }
-    };
-
     @Test
     public void testAddRemove() {
         written.put(0, "zero");
@@ -117,30 +102,4 @@ public class StoreChangeLoggerTest {
         assertEquals("three", logged.get(3));
         assertEquals("four", logged.get(4));
     }
-
-    @Test
-    public void testRaw() {
-        IntegerSerializer serializer = new IntegerSerializer();
-
-        written.put(0, "zero");
-        rawChangeLogger.add(serializer.serialize(topic, 0));
-        written.put(1, "one");
-        rawChangeLogger.add(serializer.serialize(topic, 1));
-        written.put(2, "two");
-        rawChangeLogger.add(serializer.serialize(topic, 2));
-        assertEquals(3, rawChangeLogger.numDirty());
-        assertEquals(0, rawChangeLogger.numRemoved());
-
-        rawChangeLogger.delete(serializer.serialize(topic, 0));
-        rawChangeLogger.delete(serializer.serialize(topic, 1));
-        written.put(3, "three");
-        rawChangeLogger.add(serializer.serialize(topic, 3));
-        assertEquals(2, rawChangeLogger.numDirty());
-        assertEquals(2, rawChangeLogger.numRemoved());
-
-        written.put(0, "zero-again");
-        rawChangeLogger.add(serializer.serialize(topic, 0));
-        assertEquals(3, rawChangeLogger.numDirty());
-        assertEquals(1, rawChangeLogger.numRemoved());
-    }
 }