You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/26 16:32:26 UTC
kafka git commit: KAFKA-3499: prevent array typed keys in
KeyValueStore
Repository: kafka
Updated Branches:
refs/heads/trunk e7d04c251 -> a02c8aaec
KAFKA-3499: prevent array typed keys in KeyValueStore
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Ismael Juma, Josh Gruenberg, Michael G. Noll, Ewen Cheslack-Postava
Closes #1229 from guozhangwang/K3499
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a02c8aae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a02c8aae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a02c8aae
Branch: refs/heads/trunk
Commit: a02c8aaecfbd13838c2a062bac1455da352028fe
Parents: e7d04c2
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Apr 26 07:32:21 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 26 07:32:21 2016 -0700
----------------------------------------------------------------------
.../common/serialization/BytesDeserializer.java | 35 ++++
.../common/serialization/BytesSerializer.java | 36 ++++
.../kafka/common/serialization/Serdes.java | 39 +++-
.../org/apache/kafka/common/utils/Bytes.java | 178 +++++++++++++++++++
.../kafka/streams/state/WindowStoreUtils.java | 10 +-
.../InMemoryKeyValueStoreSupplier.java | 13 +-
.../streams/state/internals/MemoryLRUCache.java | 13 ++
.../state/internals/MeteredKeyValueStore.java | 8 +-
.../state/internals/RawStoreChangeLogger.java | 56 ------
.../streams/state/internals/RocksDBStore.java | 73 ++++----
.../state/internals/RocksDBWindowStore.java | 48 ++---
.../state/internals/StoreChangeLogger.java | 10 ++
.../streams/state/KeyValueStoreTestDriver.java | 15 +-
.../internals/AbstractKeyValueStoreTest.java | 2 -
.../state/internals/StoreChangeLoggerTest.java | 41 -----
15 files changed, 386 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
new file mode 100644
index 0000000..ee6a57c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesDeserializer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Map;
+
+public class BytesDeserializer implements Deserializer<Bytes> {
+
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ public Bytes deserialize(String topic, byte[] data) {
+ if (data == null)
+ return null;
+
+ return new Bytes(data);
+ }
+
+ public void close() {
+ // nothing to do
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
new file mode 100644
index 0000000..3d04446
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/BytesSerializer.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.utils.Bytes;
+
+import java.util.Map;
+
+public class BytesSerializer implements Serializer<Bytes> {
+
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // nothing to do
+ }
+
+ public byte[] serialize(String topic, Bytes data) {
+ if (data == null)
+ return null;
+
+ return data.get();
+ }
+
+ public void close() {
+ // nothing to do
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index f27f74f..d744522 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -13,6 +13,8 @@
package org.apache.kafka.common.serialization;
+import org.apache.kafka.common.utils.Bytes;
+
import java.nio.ByteBuffer;
/**
@@ -80,6 +82,18 @@ public class Serdes {
}
}
+ static public final class BytesSerde implements Serde<Bytes> {
+ @Override
+ public Serializer<Bytes> serializer() {
+ return new BytesSerializer();
+ }
+
+ @Override
+ public Deserializer<Bytes> deserializer() {
+ return new BytesDeserializer();
+ }
+ }
+
static public final class ByteArraySerde implements Serde<byte[]> {
@Override
public Serializer<byte[]> serializer() {
@@ -114,10 +128,14 @@ public class Serdes {
return (Serde<T>) ByteArray();
}
- if (ByteBufferSerde.class.isAssignableFrom(type)) {
+ if (ByteBuffer.class.isAssignableFrom(type)) {
return (Serde<T>) ByteBuffer();
}
+ if (Bytes.class.isAssignableFrom(type)) {
+ return (Serde<T>) Bytes();
+ }
+
// TODO: we can also serializes objects of type T using generic Java serialization by default
throw new IllegalArgumentException("Unknown class for built-in serializer");
}
@@ -150,42 +168,49 @@ public class Serdes {
}
/*
- * A serde for nullable long type.
+ * A serde for nullable {@code Long} type.
*/
static public Serde<Long> Long() {
return new LongSerde();
}
/*
- * A serde for nullable int type.
+ * A serde for nullable {@code Integer} type.
*/
static public Serde<Integer> Integer() {
return new IntegerSerde();
}
/*
- * A serde for nullable long type.
+ * A serde for nullable {@code Double} type.
*/
static public Serde<Double> Double() {
return new DoubleSerde();
}
/*
- * A serde for nullable string type.
+ * A serde for nullable {@code String} type.
*/
static public Serde<String> String() {
return new StringSerde();
}
/*
- * A serde for nullable byte array type.
+ * A serde for nullable {@code ByteBuffer} type.
*/
static public Serde<ByteBuffer> ByteBuffer() {
return new ByteBufferSerde();
}
/*
- * A serde for nullable byte array type.
+ * A serde for nullable {@code Bytes} type.
+ */
+ static public Serde<Bytes> Bytes() {
+ return new BytesSerde();
+ }
+
+ /*
+ * A serde for nullable {@code byte[]} type.
*/
static public Serde<byte[]> ByteArray() {
return new ByteArraySerde();
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
new file mode 100644
index 0000000..78340e5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Bytes.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * Utility class that handles immutable byte arrays.
+ */
+public class Bytes implements Comparable<Bytes> {
+
+ private static final char[] HEX_CHARS_UPPER = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
+
+ private final byte[] bytes;
+
+ // cache the hash code for the string, default to 0
+ private int hashCode;
+
+ public static Bytes wrap(byte[] bytes) {
+ return new Bytes(bytes);
+ }
+
+ /**
+ * Create a Bytes using the byte array.
+ *
+ * @param bytes This array becomes the backing storage for the object.
+ */
+ public Bytes(byte[] bytes) {
+ this.bytes = bytes;
+
+ // initialize hash code to 0
+ hashCode = 0;
+ }
+
+ /**
+ * Get the data from the Bytes.
+ * @return The data is only valid between offset and offset+length.
+ */
+ public byte[] get() {
+ return this.bytes;
+ }
+
+ /**
+ * The hashcode is cached except for the case where it is computed as 0, in which
+ * case we compute the hashcode on every call.
+ *
+ * @return the hashcode
+ */
+ @Override
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = Arrays.hashCode(bytes);
+ }
+
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other)
+ return true;
+
+ // we intentionally use the function to compute hashcode here
+ if (this.hashCode() != other.hashCode())
+ return false;
+
+ if (other instanceof Bytes)
+ return Arrays.equals(this.bytes, ((Bytes) other).get());
+
+ return false;
+ }
+
+ @Override
+ public int compareTo(Bytes that) {
+ return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes);
+ }
+
+ @Override
+ public String toString() {
+ return Bytes.toString(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Write a printable representation of a byte array. Non-printable
+ * characters are hex escaped in the format \\x%02X, eg:
+ * \x00 \x05 etc.
+ *
+ * This function is brought from org.apache.hadoop.hbase.util.Bytes
+ *
+ * @param b array to write out
+ * @param off offset to start at
+ * @param len length to write
+ * @return string output
+ */
+ private static String toString(final byte[] b, int off, int len) {
+ StringBuilder result = new StringBuilder();
+
+ if (b == null)
+ return result.toString();
+
+ // just in case we are passed a 'len' that is > buffer length...
+ if (off >= b.length)
+ return result.toString();
+
+ if (off + len > b.length)
+ len = b.length - off;
+
+ for (int i = off; i < off + len; ++i) {
+ int ch = b[i] & 0xFF;
+ if (ch >= ' ' && ch <= '~' && ch != '\\') {
+ result.append((char) ch);
+ } else {
+ result.append("\\x");
+ result.append(HEX_CHARS_UPPER[ch / 0x10]);
+ result.append(HEX_CHARS_UPPER[ch % 0x10]);
+ }
+ }
+ return result.toString();
+ }
+
+ /**
+ * A byte array comparator based on lexicograpic ordering.
+ */
+ public final static Comparator<byte[]> BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
+
+ private interface ByteArrayComparator extends Comparator<byte[]> {
+
+ int compare(final byte[] buffer1, int offset1, int length1,
+ final byte[] buffer2, int offset2, int length2);
+ }
+
+ private static class LexicographicByteArrayComparator implements ByteArrayComparator {
+
+ @Override
+ public int compare(byte[] buffer1, byte[] buffer2) {
+ return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length);
+ }
+
+ public int compare(final byte[] buffer1, int offset1, int length1,
+ final byte[] buffer2, int offset2, int length2) {
+
+ // short circuit equal case
+ if (buffer1 == buffer2 &&
+ offset1 == offset2 &&
+ length1 == length2) {
+ return 0;
+ }
+
+ // similar to Arrays.compare() but considers offset and length
+ int end1 = offset1 + length1;
+ int end2 = offset2 + length2;
+ for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
+ int a = buffer1[i] & 0xff;
+ int b = buffer2[j] & 0xff;
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return length1 - length2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
index fdf3269..2f99ad6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
@@ -21,6 +21,7 @@ package org.apache.kafka.streams.state;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import java.nio.ByteBuffer;
@@ -30,13 +31,12 @@ public class WindowStoreUtils {
private static final int TIMESTAMP_SIZE = 8;
/** Inner byte array serde used for segments */
- public static final Serde<byte[]> INNER_SERDE = Serdes.ByteArray();
-
- /** Inner byte array state serde used for segments */
- public static final StateSerdes<byte[], byte[]> INNER_SERDES = new StateSerdes<>("", INNER_SERDE, INNER_SERDE);
+ public static final Serde<Bytes> INNER_KEY_SERDE = Serdes.Bytes();
+ public static final Serde<byte[]> INNER_VALUE_SERDE = Serdes.ByteArray();
+ public static final StateSerdes<Bytes, byte[]> INNER_SERDES = new StateSerdes<>("rocksDB-inner", INNER_KEY_SERDE, INNER_VALUE_SERDE);
@SuppressWarnings("unchecked")
- public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
+ public static final KeyValueIterator<Bytes, byte[]>[] NO_ITERATORS = (KeyValueIterator<Bytes, byte[]>[]) new KeyValueIterator[0];
public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, StateSerdes<K, ?> serdes) {
byte[] serializedKey = serdes.rawKey(key);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 3a5819c..a25153c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -35,6 +35,10 @@ import java.util.TreeMap;
/**
* An in-memory key-value store based on a TreeMap.
*
+ * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+ *
* @param <K> The key type
* @param <V> The value type
*
@@ -63,7 +67,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time);
+ return new MeteredKeyValueStore<>(new MemoryStore<>(name, keySerde, valueSerde).enableLogging(), "in-memory-state", time);
}
private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -76,6 +80,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
this.name = name;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
+
+ // TODO: when we have serde associated with class types, we can
+ // improve this situation by passing the comparator here.
this.map = new TreeMap<>();
}
@@ -131,12 +138,12 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
@Override
public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+ return new MemoryStoreIterator<>(this.map.subMap(from, true, to, false).entrySet().iterator());
}
@Override
public KeyValueIterator<K, V> all() {
- return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+ return new MemoryStoreIterator<>(this.map.entrySet().iterator());
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 76dd744..d410e02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -29,6 +29,19 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+/**
+ * An in-memory LRU cache store based on HashSet and HashMap.
+ *
+ * * Note that the use of array-typed keys is discouraged because they result in incorrect ordering behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
public interface EldestEntryRemovalListener<K, V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 9808c04..5e5b54a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -137,9 +137,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
public V delete(K key) {
long startNs = time.nanoseconds();
try {
- V value = this.inner.delete(key);
-
- return value;
+ return this.inner.delete(key);
} finally {
this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
}
@@ -147,12 +145,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
@Override
public KeyValueIterator<K, V> range(K from, K to) {
- return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
+ return new MeteredKeyValueIterator<>(this.inner.range(from, to), this.rangeTime);
}
@Override
public KeyValueIterator<K, V> all() {
- return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
+ return new MeteredKeyValueIterator<>(this.inner.all(), this.allTime);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
deleted file mode 100644
index 4d99b59..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.WindowStoreUtils;
-
-import java.util.Comparator;
-import java.util.TreeSet;
-
-public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> {
-
- private class ByteArrayComparator implements Comparator<byte[]> {
- @Override
- public int compare(byte[] left, byte[] right) {
- for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
- int a = left[i] & 0xff;
- int b = right[j] & 0xff;
-
- if (a != b)
- return a - b;
- }
- return left.length - right.length;
- }
- }
-
- public RawStoreChangeLogger(String storeName, ProcessorContext context) {
- this(storeName, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
- }
-
- public RawStoreChangeLogger(String storeName, ProcessorContext context, int maxDirty, int maxRemoved) {
- super(storeName, context, context.taskId().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
- init();
- }
-
- @Override
- public void init() {
- this.dirty = new TreeSet<>(new ByteArrayComparator());
- this.removed = new TreeSet<>(new ByteArrayComparator());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 944d408..3fef0ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.WindowStoreUtils;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
@@ -46,6 +48,18 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
+/**
+ * A persistent key-value store based on RocksDB.
+ *
+ * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final int TTL_NOT_USED = -1;
@@ -80,8 +94,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private Set<K> cacheDirtyKeys;
private MemoryLRUCache<K, RocksDBCacheEntry> cache;
- private StoreChangeLogger<byte[], byte[]> changeLogger;
- private StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+ private StoreChangeLogger<Bytes, byte[]> changeLogger;
+ private StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
public KeyValueStore<K, V> enableLogging() {
loggingEnabled = true;
@@ -156,7 +170,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
// open the DB dir
openDB(context);
- this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
+ this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, WindowStoreUtils.INNER_SERDES) : null;
if (this.cacheSize > 0) {
this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
@@ -170,7 +184,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
});
-
this.cacheDirtyKeys = new HashSet<>();
} else {
this.cache = null;
@@ -179,10 +192,10 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
// value getter should always read directly from rocksDB
// since it is only for values that are already flushed
- this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+ this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
@Override
- public byte[] get(byte[] key) {
- return getInternal(key);
+ public byte[] get(Bytes key) {
+ return getInternal(key.get());
}
};
@@ -258,7 +271,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
putInternal(rawKey, rawValue);
if (loggingEnabled) {
- changeLogger.add(rawKey);
+ changeLogger.add(Bytes.wrap(rawKey));
changeLogger.maybeLogChange(this.getter);
}
}
@@ -325,7 +338,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
if (cache != null)
flushCache();
- return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+ return new RocksDBRangeIterator<>(db.newIterator(), serdes, from, to);
}
@Override
@@ -336,7 +349,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
- return new RocksDbIterator<K, V>(innerIter, serdes);
+ return new RocksDbIterator<>(innerIter, serdes);
}
private void flushCache() {
@@ -348,14 +361,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
for (K key : cacheDirtyKeys) {
RocksDBCacheEntry entry = cache.get(key);
- entry.isDirty = false;
+ if (entry != null) {
+ entry.isDirty = false;
- byte[] rawKey = serdes.rawKey(key);
+ byte[] rawKey = serdes.rawKey(key);
- if (entry.value != null) {
- putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value)));
- } else {
- deleteBatch.add(rawKey);
+ if (entry.value != null) {
+ putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value)));
+ } else {
+ deleteBatch.add(rawKey);
+ }
}
}
@@ -363,7 +378,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
if (loggingEnabled) {
for (KeyValue<byte[], byte[]> kv : putBatch)
- changeLogger.add(kv.key);
+ changeLogger.add(Bytes.wrap(kv.key));
}
// check all removed entries and remove them in rocksDB
@@ -376,7 +391,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
if (loggingEnabled) {
- changeLogger.delete(removedKey);
+ changeLogger.delete(Bytes.wrap(removedKey));
}
}
@@ -464,30 +479,14 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
- private static class LexicographicComparator implements Comparator<byte[]> {
-
- @Override
- public int compare(byte[] left, byte[] right) {
- for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
- int leftByte = left[i] & 0xff;
- int rightByte = right[j] & 0xff;
- if (leftByte != rightByte) {
- return leftByte - rightByte;
- }
- }
- return left.length - right.length;
- }
- }
-
private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
// RocksDB's JNI interface does not expose getters/setters that allow the
// comparator to be pluggable, and the default is lexicographic, so it's
// safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator = new LexicographicComparator();
- byte[] rawToKey;
+ private final Comparator<byte[]> comparator = Bytes.BYTES_LEXICO_COMPARATOR;
+ private byte[] rawToKey;
- public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes,
- K from, K to) {
+ public RocksDBRangeIterator(RocksIterator iter, StateSerdes<K, V> serdes, K from, K to) {
super(iter, serdes);
iter.seek(serdes.rawKey(from));
this.rawToKey = serdes.rawKey(to);
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 5955d21..4c964c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -20,6 +20,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -31,7 +32,6 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.WindowStoreUtils;
-
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -48,11 +48,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private static final long USE_CURRENT_TIMESTAMP = -1L;
- private static class Segment extends RocksDBStore<byte[], byte[]> {
+ // use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures
+ private static class Segment extends RocksDBStore<Bytes, byte[]> {
public final long id;
Segment(String segmentName, String windowName, long id) {
- super(segmentName, windowName, WindowStoreUtils.INNER_SERDE, WindowStoreUtils.INNER_SERDE);
+ super(segmentName, windowName, WindowStoreUtils.INNER_KEY_SERDE, WindowStoreUtils.INNER_VALUE_SERDE);
this.id = id;
}
@@ -63,14 +64,14 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
private final StateSerdes<?, V> serdes;
- private final KeyValueIterator<byte[], byte[]>[] iterators;
+ private final KeyValueIterator<Bytes, byte[]>[] iterators;
private int index = 0;
RocksDBWindowStoreIterator(StateSerdes<?, V> serdes) {
this(serdes, WindowStoreUtils.NO_ITERATORS);
}
- RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
+ RocksDBWindowStoreIterator(StateSerdes<?, V> serdes, KeyValueIterator<Bytes, byte[]>[] iterators) {
this.serdes = serdes;
this.iterators = iterators;
}
@@ -94,9 +95,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
if (index >= iterators.length)
throw new NoSuchElementException();
- KeyValue<byte[], byte[]> kv = iterators[index].next();
+ KeyValue<Bytes, byte[]> kv = iterators[index].next();
- return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key),
+ return new KeyValue<>(WindowStoreUtils.timestampFromBinaryKey(kv.key.get()),
serdes.valueFrom(kv.value));
}
@@ -108,7 +109,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
@Override
public void close() {
- for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
+ for (KeyValueIterator<Bytes, byte[]> iterator : iterators) {
iterator.close();
}
}
@@ -121,7 +122,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private final SimpleDateFormat formatter;
- private final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+ private final StoreChangeLogger.ValueGetter<Bytes, byte[]> getter;
private ProcessorContext context;
private int seqnum = 0;
@@ -130,7 +131,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
private StateSerdes<K, V> serdes;
private boolean loggingEnabled = false;
- private StoreChangeLogger<byte[], byte[]> changeLogger = null;
+ private StoreChangeLogger<Bytes, byte[]> changeLogger = null;
public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serde<K> keySerde, Serde<V> valueSerde) {
this.name = name;
@@ -144,9 +145,9 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
this.retainDuplicates = retainDuplicates;
- this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
- public byte[] get(byte[] key) {
- return getInternal(key);
+ this.getter = new StoreChangeLogger.ValueGetter<Bytes, byte[]>() {
+ public byte[] get(Bytes key) {
+ return getInternal(key.get());
}
};
@@ -178,13 +179,16 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
openExistingSegments();
- this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
+ this.changeLogger = this.loggingEnabled ? new StoreChangeLogger(name, context, WindowStoreUtils.INNER_SERDES) : null;
// register and possibly restore the state from the logs
context.register(root, loggingEnabled, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
- putInternal(key, value);
+ // if the value is null, it means that this record has already been
+ // deleted while it was captured in the changelog, hence we do not need to put any more.
+ if (value != null)
+ putInternal(key, value);
}
});
@@ -249,7 +253,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
byte[] rawKey = putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
if (rawKey != null && loggingEnabled) {
- changeLogger.add(rawKey);
+ changeLogger.add(Bytes.wrap(rawKey));
changeLogger.maybeLogChange(this.getter);
}
}
@@ -259,7 +263,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
byte[] rawKey = putAndReturnInternalKey(key, value, timestamp);
if (rawKey != null && loggingEnabled) {
- changeLogger.add(rawKey);
+ changeLogger.add(Bytes.wrap(rawKey));
changeLogger.maybeLogChange(this.getter);
}
}
@@ -281,7 +285,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
if (retainDuplicates)
seqnum = (seqnum + 1) & 0x7FFFFFFF;
byte[] binaryKey = WindowStoreUtils.toBinaryKey(key, timestamp, seqnum, serdes);
- segment.put(binaryKey, serdes.rawValue(value));
+ segment.put(Bytes.wrap(binaryKey), serdes.rawValue(value));
return binaryKey;
} else {
return null;
@@ -300,7 +304,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
// If the record is within the retention period, put it in the store.
Segment segment = getSegment(segmentId);
if (segment != null)
- segment.put(binaryKey, binaryValue);
+ segment.put(Bytes.wrap(binaryKey), binaryValue);
}
private byte[] getInternal(byte[] binaryKey) {
@@ -308,7 +312,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
Segment segment = getSegment(segmentId);
if (segment != null) {
- return segment.get(binaryKey);
+ return segment.get(Bytes.wrap(binaryKey));
} else {
return null;
}
@@ -323,12 +327,12 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
byte[] binaryFrom = WindowStoreUtils.toBinaryKey(key, timeFrom, 0, serdes);
byte[] binaryTo = WindowStoreUtils.toBinaryKey(key, timeTo, Integer.MAX_VALUE, serdes);
- ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
+ ArrayList<KeyValueIterator<Bytes, byte[]>> iterators = new ArrayList<>();
for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
Segment segment = getSegment(segmentId);
if (segment != null)
- iterators.add(segment.range(binaryFrom, binaryTo));
+ iterators.add(segment.range(Bytes.wrap(binaryFrom), Bytes.wrap(binaryTo)));
}
if (iterators.size() > 0) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index a439117..3f848fe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -27,6 +27,16 @@ import org.apache.kafka.streams.state.StateSerdes;
import java.util.HashSet;
import java.util.Set;
+/**
+ * Store change log collector that batches updates before sending to Kafka.
+ *
+ * Note that the use of array-typed keys is discouraged because they result in incorrect caching behavior.
+ * If you intend to work on byte arrays as key, for example, you may want to wrap them with the {@code Bytes} class,
+ * i.e. use {@code RocksDBStore<Bytes, ...>} rather than {@code RocksDBStore<byte[], ...>}.
+ *
+ * @param <K>
+ * @param <V>
+ */
public class StoreChangeLogger<K, V> {
public interface ValueGetter<K, V> {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 0468f49..3a35d75 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -198,19 +198,8 @@ public class KeyValueStoreTestDriver<K, V> {
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
// for byte arrays we need to wrap it for comparison
- K key;
- if (record.key() instanceof byte[]) {
- key = serdes.keyFrom((byte[]) record.key());
- } else {
- key = (K) record.key();
- }
-
- V value;
- if (record.key() instanceof byte[]) {
- value = serdes.valueFrom((byte[]) record.value());
- } else {
- value = (V) record.value();
- }
+ K key = serdes.keyFrom(keySerializer.serialize(record.topic(), record.key()));
+ V value = serdes.valueFrom(valueSerializer.serialize(record.topic(), record.value()));
recordFlushed(key, value);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index fb0efc9..2bfe644 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -192,8 +192,6 @@ public abstract class AbstractKeyValueStoreTest {
}
}
-
-
@Test
public void testPutIfAbsent() {
// Create the test driver ...
http://git-wip-us.apache.org/repos/asf/kafka/blob/a02c8aae/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index 9a477df..09f12fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -24,10 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.RecordCollector;
@@ -63,8 +60,6 @@ public class StoreChangeLoggerTest {
private final StoreChangeLogger<Integer, String> changeLogger = new StoreChangeLogger<>(topic, context, StateSerdes.withBuiltinTypes(topic, Integer.class, String.class), 3, 3);
- private final StoreChangeLogger<byte[], byte[]> rawChangeLogger = new RawStoreChangeLogger(topic, context, 3, 3);
-
private final StoreChangeLogger.ValueGetter<Integer, String> getter = new StoreChangeLogger.ValueGetter<Integer, String>() {
@Override
public String get(Integer key) {
@@ -72,16 +67,6 @@ public class StoreChangeLoggerTest {
}
};
- private final StoreChangeLogger.ValueGetter<byte[], byte[]> rawGetter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
- private IntegerDeserializer deserializer = new IntegerDeserializer();
- private StringSerializer serializer = new StringSerializer();
-
- @Override
- public byte[] get(byte[] key) {
- return serializer.serialize(topic, written.get(deserializer.deserialize(topic, key)));
- }
- };
-
@Test
public void testAddRemove() {
written.put(0, "zero");
@@ -117,30 +102,4 @@ public class StoreChangeLoggerTest {
assertEquals("three", logged.get(3));
assertEquals("four", logged.get(4));
}
-
- @Test
- public void testRaw() {
- IntegerSerializer serializer = new IntegerSerializer();
-
- written.put(0, "zero");
- rawChangeLogger.add(serializer.serialize(topic, 0));
- written.put(1, "one");
- rawChangeLogger.add(serializer.serialize(topic, 1));
- written.put(2, "two");
- rawChangeLogger.add(serializer.serialize(topic, 2));
- assertEquals(3, rawChangeLogger.numDirty());
- assertEquals(0, rawChangeLogger.numRemoved());
-
- rawChangeLogger.delete(serializer.serialize(topic, 0));
- rawChangeLogger.delete(serializer.serialize(topic, 1));
- written.put(3, "three");
- rawChangeLogger.add(serializer.serialize(topic, 3));
- assertEquals(2, rawChangeLogger.numDirty());
- assertEquals(2, rawChangeLogger.numRemoved());
-
- written.put(0, "zero-again");
- rawChangeLogger.add(serializer.serialize(topic, 0));
- assertEquals(3, rawChangeLogger.numDirty());
- assertEquals(1, rawChangeLogger.numRemoved());
- }
}