You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/02 01:11:28 UTC
[2/2] kafka git commit: KAFKA-3060: Refactor MeteredStore and
RockDBStore Impl
KAFKA-3060: Refactor MeteredStore and RockDBStore Impl
Changes include:
1) Move logging logic from MeteredXXXStore to internal stores, and leave WindowedStore API clean by removed all internalPut/Get functions.
2) Wrap common logging behavior of InMemory and LRUCache stores into one class.
3) Fix a bug for StoreChangeLogger where byte arrays are not comparable in HashSet by using a specified RawStoreChangeLogger.
4) Add a caching layer on top of RocksDBStore with object caching, it relies on the object's equals and hashCode function to be consistent with serdes.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #826 from guozhangwang/K3060
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57da044a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57da044a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57da044a
Branch: refs/heads/trunk
Commit: 57da044a991ebf8913d44dfcfa6a27729f54a4d5
Parents: 66ecf3f
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 1 16:11:13 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Feb 1 16:11:13 2016 -0800
----------------------------------------------------------------------
.../examples/pageview/PageViewTypedJob.java | 6 +-
.../examples/pageview/PageViewUnTypedJob.java | 6 +-
.../kafka/streams/examples/pipe/PipeJob.java | 4 +-
.../examples/wordcount/WordCountJob.java | 4 +-
.../org/apache/kafka/streams/StreamsConfig.java | 1 +
.../streams/kstream/internals/KStreamImpl.java | 76 +++----
.../streams/kstream/internals/KTableImpl.java | 4 +-
.../kstream/internals/KTableStoreSupplier.java | 2 +-
.../org/apache/kafka/streams/state/Stores.java | 36 ++-
.../apache/kafka/streams/state/WindowStore.java | 7 -
.../kafka/streams/state/WindowStoreUtil.java | 55 -----
.../kafka/streams/state/WindowStoreUtils.java | 54 +++++
.../internals/InMemoryKeyValueLoggedStore.java | 132 +++++++++++
.../InMemoryKeyValueStoreSupplier.java | 28 ++-
.../InMemoryLRUCacheStoreSupplier.java | 151 +------------
.../streams/state/internals/MemoryLRUCache.java | 151 +++++++++++++
.../internals/MemoryNavigableLRUCache.java | 103 +++++++++
.../state/internals/MeteredKeyValueStore.java | 79 +------
.../state/internals/MeteredWindowStore.java | 68 +-----
.../state/internals/OffsetCheckpoint.java | 8 +-
.../state/internals/RawStoreChangeLogger.java | 56 +++++
.../internals/RocksDBKeyValueStoreSupplier.java | 5 +-
.../streams/state/internals/RocksDBStore.java | 221 +++++++++++++++++--
.../state/internals/RocksDBWindowStore.java | 80 +++++--
.../internals/RocksDBWindowStoreSupplier.java | 4 +-
.../state/internals/StoreChangeLogger.java | 47 +++-
.../internals/ProcessorTopologyTest.java | 4 +-
.../streams/state/KeyValueStoreTestDriver.java | 54 +++--
.../kafka/streams/state/StateTestUtils.java | 79 +++++++
.../apache/kafka/streams/state/StateUtils.java | 79 -------
.../internals/AbstractKeyValueStoreTest.java | 2 +-
.../internals/InMemoryLRUCacheStoreTest.java | 197 +++++++----------
.../state/internals/OffsetCheckpointTest.java | 61 +++++
.../internals/RocksDBKeyValueStoreTest.java | 4 +-
.../state/internals/RocksDBWindowStoreTest.java | 8 +-
.../state/internals/StoreChangeLoggerTest.java | 146 ++++++++++++
.../apache/kafka/test/MockProcessorContext.java | 11 +
.../kafka/test/MockStateStoreSupplier.java | 10 +-
38 files changed, 1358 insertions(+), 685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index c064848..7f11512 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Count;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -33,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.StreamsConfig;
import java.util.Properties;
@@ -121,7 +121,7 @@ public class PageViewTypedJob {
// write to the result topic
regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
- KafkaStreams kstream = new KafkaStreams(builder, props);
- kstream.start();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
index 1ae02c9..013332e 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -53,8 +53,6 @@ public class PageViewUnTypedJob {
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
- StreamsConfig config = new StreamsConfig(props);
-
KStreamBuilder builder = new KStreamBuilder();
final Serializer<String> stringSerializer = new StringSerializer();
@@ -101,7 +99,7 @@ public class PageViewUnTypedJob {
// write to the result topic
regionCount.to("streams-pageviewstats-output");
- KafkaStreams kstream = new KafkaStreams(builder, config);
- kstream.start();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
index 4a4f97f..841f37b 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -44,7 +44,7 @@ public class PipeJob {
builder.stream("streams-file-input").to("streams-pipe-output");
- KafkaStreams kstream = new KafkaStreams(builder, props);
- kstream.start();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index 8aa15a4..c66e965 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -97,7 +97,7 @@ public class WordCountJob {
counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
- KafkaStreams kstream = new KafkaStreams(builder, props);
- kstream.start();
+ KafkaStreams streams = new KafkaStreams(builder, props);
+ streams.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 16bb06a..041d0e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -157,6 +157,7 @@ public class StreamsConfig extends AbstractConfig {
ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
.define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
Type.CLASS,
+ WALLCLOCK_TIMESTAMP_EXTRACTOR,
Importance.MEDIUM,
TIMESTAMP_EXTRACTOR_CLASS_DOC)
.define(PARTITION_GROUPER_CLASS_CONFIG,
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7ebc28c..73f7266 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -36,9 +36,9 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.Stores;
import java.lang.reflect.Array;
import java.util.HashSet;
@@ -304,23 +304,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
- RocksDBWindowStoreSupplier<K, V> thisWindow =
- new RocksDBWindowStoreSupplier<>(
- windows.name() + "-this",
- windows.maintainMs(),
- windows.segments,
- true,
- new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
- null);
-
- RocksDBWindowStoreSupplier<K, V1> otherWindow =
- new RocksDBWindowStoreSupplier<>(
- windows.name() + "-other",
- windows.maintainMs(),
- windows.segments,
- true,
- new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
- null);
+ StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(otherValueSerializer, otherValueDeserializer)
+ .persistent()
+ .windowed(windows.maintainMs(), windows.segments, true)
+ .build();
+
+ StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(otherValueSerializer, otherValueDeserializer)
+ .persistent()
+ .windowed(windows.maintainMs(), windows.segments, true)
+ .build();
KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
@@ -360,14 +356,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
- RocksDBWindowStoreSupplier<K, V1> otherWindow =
- new RocksDBWindowStoreSupplier<>(
- windows.name() + "-this",
- windows.maintainMs(),
- windows.segments,
- true,
- new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
- null);
+ StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(otherValueSerializer, otherValueDeserializer)
+ .persistent()
+ .windowed(windows.maintainMs(), windows.segments, true)
+ .build();
KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
@@ -410,14 +404,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
- RocksDBWindowStoreSupplier<K, V> aggregateStore =
- new RocksDBWindowStoreSupplier<>(
- windows.name(),
- windows.maintainMs(),
- windows.segments,
- false,
- new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
- null);
+ StateStoreSupplier aggregateStore = Stores.create(windows.name())
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(aggValueSerializer, aggValueDeserializer)
+ .persistent()
+ .windowed(windows.maintainMs(), windows.segments, false)
+ .build();
// aggregate the values with the aggregator and local store
topology.addProcessor(selectName, aggWindowSupplier, this.name);
@@ -444,14 +436,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
- RocksDBWindowStoreSupplier<K, T> aggregateStore =
- new RocksDBWindowStoreSupplier<>(
- windows.name(),
- windows.maintainMs(),
- windows.segments,
- false,
- new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
- null);
+ StateStoreSupplier aggregateStore = Stores.create(windows.name())
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(aggValueSerializer, aggValueDeserializer)
+ .persistent()
+ .windowed(windows.maintainMs(), windows.segments, false)
+ .build();
// aggregate the values with the aggregator and local store
topology.addProcessor(selectName, aggWindowSupplier, this.name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index d046090..4398e3f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -273,7 +273,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
.withValues(aggValueSerializer, aggValueDeserializer)
- .localDatabase()
+ .persistent()
.build();
// select the aggregate key and values (old and new), it would require parent to send old values
@@ -322,7 +322,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
.withValues(valueSerializer, valueDeserializer)
- .localDatabase()
+ .persistent()
.build();
// select the aggregate key and values (old and new), it would require parent to send old values
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
index c993512..ffd5cf0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -52,7 +52,7 @@ public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), serdes, "rocksdb-state", time).disableLogging();
+ return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), "rocksdb-state", time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 46b2592..e9d82bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
/**
* Factory for creating key-value stores.
@@ -76,10 +77,27 @@ public class Stores {
}
@Override
- public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
- return new LocalDatabaseKeyValueFactory<K, V>() {
+ public PersistentKeyValueFactory<K, V> persistent() {
+ return new PersistentKeyValueFactory<K, V>() {
+ private int numSegments = 0;
+ private long retentionPeriod = 0L;
+ private boolean retainDuplicates = false;
+
+ @Override
+ public PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates) {
+ this.numSegments = numSegments;
+ this.retentionPeriod = retentionPeriod;
+ this.retainDuplicates = retainDuplicates;
+
+ return this;
+ }
+
@Override
public StateStoreSupplier build() {
+ if (numSegments > 0) {
+ return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, serdes, null);
+ }
+
return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
}
};
@@ -237,7 +255,7 @@ public class Stores {
*
* @return the factory to create in-memory key-value stores; never null
*/
- LocalDatabaseKeyValueFactory<K, V> localDatabase();
+ PersistentKeyValueFactory<K, V> persistent();
}
/**
@@ -270,7 +288,17 @@ public class Stores {
* @param <K> the type of keys
* @param <V> the type of values
*/
- public static interface LocalDatabaseKeyValueFactory<K, V> {
+ public static interface PersistentKeyValueFactory<K, V> {
+
+ /**
+ * Set the persistent store as a windowed key-value store
+ *
+ * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
+ * @param numSegments the maximum number of segments for rolling the windowed store
+ * @param retainDuplicates whether or not to retain duplicate data within the window
+ */
+ PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates);
+
/**
* Return the instance of StateStoreSupplier of new key-value store.
* @return the key-value store; never null
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index b17d889..1d806e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -27,12 +27,5 @@ public interface WindowStore<K, V> extends StateStore {
void put(K key, V value, long timestamp);
- byte[] putAndReturnInternalKey(K key, V value, long timestamp);
-
WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
-
- void putInternal(byte[] binaryKey, byte[] binaryValue);
-
- byte[] getInternal(byte[] binaryKey);
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
deleted file mode 100644
index b11a206..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import java.nio.ByteBuffer;
-
-public class WindowStoreUtil<K, V> {
-
- public static final int TIMESTAMP_SIZE = 8;
- public static final int SEQNUM_SIZE = 4;
- public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class);
- @SuppressWarnings("unchecked")
- public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
-
- public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) {
- byte[] serializedKey = serdes.rawKey(key);
-
- ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
- buf.put(serializedKey);
- buf.putLong(timestamp);
- buf.putInt(seqnum);
-
- return buf.array();
- }
-
- public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) {
- byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
-
- System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-
- return serdes.keyFrom(bytes);
- }
-
- public static long timestampFromBinaryKey(byte[] binaryKey) {
- return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
new file mode 100644
index 0000000..3a3d585
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import java.nio.ByteBuffer;
+
+public class WindowStoreUtils<K, V> {
+
+ public static final int TIMESTAMP_SIZE = 8;
+ public static final int SEQNUM_SIZE = 4;
+ public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class);
+ @SuppressWarnings("unchecked")
+ public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
+
+ public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) {
+ byte[] serializedKey = serdes.rawKey(key);
+
+ ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
+ buf.put(serializedKey);
+ buf.putLong(timestamp);
+ buf.putInt(seqnum);
+
+ return buf.array();
+ }
+
+ public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) {
+ byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
+
+ System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+
+ return serdes.keyFrom(bytes);
+ }
+
+ public static long timestampFromBinaryKey(byte[] binaryKey) {
+ return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
new file mode 100644
index 0000000..5be6483
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.List;
+
+public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
+
+ private final KeyValueStore<K, V> inner;
+ private final Serdes<K, V> serdes;
+ private final String topic;
+
+ private StoreChangeLogger<K, V> changeLogger;
+ private StoreChangeLogger.ValueGetter<K, V> getter;
+
+ public InMemoryKeyValueLoggedStore(final String topic, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
+ this.topic = topic;
+ this.inner = inner;
+ this.serdes = serdes;
+ }
+
+ @Override
+ public String name() {
+ return this.topic;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ this.changeLogger = new StoreChangeLogger<>(topic, context, serdes);
+
+ inner.init(context);
+
+ this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
+ @Override
+ public V get(K key) {
+ return inner.get(key);
+ }
+ };
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public V get(K key) {
+ return this.inner.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.inner.put(key, value);
+
+ changeLogger.add(key);
+ changeLogger.maybeLogChange(this.getter);
+ }
+
+ @Override
+ public void putAll(List<KeyValue<K, V>> entries) {
+ this.inner.putAll(entries);
+
+ for (KeyValue<K, V> entry : entries) {
+ K key = entry.key;
+ changeLogger.add(key);
+ }
+ changeLogger.maybeLogChange(this.getter);
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = this.inner.delete(key);
+
+ removed(key);
+
+ return value;
+ }
+
+ /**
+ * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
+ * store.
+ *
+ * @param key the key for the entry that the inner store removed
+ */
+ protected void removed(K key) {
+ changeLogger.delete(key);
+ changeLogger.maybeLogChange(this.getter);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return this.inner.range(from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return this.inner.all();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public void flush() {
+ this.inner.flush();
+
+ changeLogger.logChange(getter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 4856b09..03290c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -43,13 +44,13 @@ import java.util.TreeMap;
public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
- private final Serdes serdes;
private final Time time;
+ private final Serdes<K, V> serdes;
public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
this.name = name;
- this.serdes = serdes;
this.time = time;
+ this.serdes = serdes;
}
public String name() {
@@ -57,7 +58,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+ return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name).enableLogging(serdes), "in-memory-state", time);
}
private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -65,12 +66,22 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
private final NavigableMap<K, V> map;
+ private boolean loggingEnabled = false;
+ private Serdes<K, V> serdes = null;
+
public MemoryStore(String name) {
super();
this.name = name;
this.map = new TreeMap<>();
}
+ public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
+ this.loggingEnabled = true;
+ this.serdes = serdes;
+
+ return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
+ }
+
@Override
public String name() {
return this.name;
@@ -78,7 +89,16 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
@Override
public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
+ if (loggingEnabled) {
+ context.register(this, true, new StateRestoreCallback() {
+
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ put(serdes.keyFrom(key), serdes.valueFrom(value));
+ }
+ });
+
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 22ee3f7..9b7936a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -17,21 +17,10 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
/**
* An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
*
@@ -43,7 +32,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
private final int capacity;
- private final Serdes serdes;
+ private final Serdes<K, V> serdes;
private final Time time;
public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
@@ -57,143 +46,17 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
return name;
}
+ @SuppressWarnings("unchecked")
public StateStore get() {
- MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
- final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
- cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+ final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K, V>(name, capacity);
+ final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(serdes);
+ final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
+ cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
@Override
public void apply(K key, V value) {
- store.removed(key);
+ loggedCache.removed(key);
}
});
return store;
}
-
- private static interface EldestEntryRemovalListener<K, V> {
- public void apply(K key, V value);
- }
-
- protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final Map<K, V> map;
- private final NavigableSet<K> keys;
- private EldestEntryRemovalListener<K, V> listener;
-
- public MemoryLRUCache(String name, final int maxCacheSize) {
- this.name = name;
- this.keys = new TreeSet<>();
- // leave room for one extra entry to handle adding an entry before the oldest can be removed
- this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- if (size() > maxCacheSize) {
- K key = eldest.getKey();
- keys.remove(key);
- if (listener != null) listener.apply(key, eldest.getValue());
- return true;
- }
- return false;
- }
- };
- }
-
- protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
- this.listener = listener;
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- this.keys.add(key);
- }
-
- @Override
- public void putAll(List<KeyValue<K, V>> entries) {
- for (KeyValue<K, V> entry : entries)
- put(entry.key, entry.value);
- }
-
- @Override
- public V delete(K key) {
- V value = this.map.remove(key);
- this.keys.remove(key);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<K> keys;
- private final Map<K, V> entries;
- private K lastKey;
-
- public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
- this.keys = keys;
- this.entries = entries;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public KeyValue<K, V> next() {
- lastKey = keys.next();
- return new KeyValue<>(lastKey, entries.get(lastKey));
- }
-
- @Override
- public void remove() {
- keys.remove();
- entries.remove(lastKey);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
new file mode 100644
index 0000000..aaa1efd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+ public interface EldestEntryRemovalListener<K, V> {
+ void apply(K key, V value);
+ }
+
+ protected String name;
+ protected Map<K, V> map;
+ protected Set<K> keys;
+
+ protected EldestEntryRemovalListener<K, V> listener;
+
+ private boolean loggingEnabled = false;
+ private Serdes<K, V> serdes = null;
+
+ // this is used for extended MemoryNavigableLRUCache only
+ public MemoryLRUCache() {}
+
+ public MemoryLRUCache(String name, final int maxCacheSize) {
+ this.name = name;
+ this.keys = new HashSet<>();
+
+ // leave room for one extra entry to handle adding an entry before the oldest can be removed
+ this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ if (size() > maxCacheSize) {
+ K key = eldest.getKey();
+ keys.remove(key);
+ if (listener != null) listener.apply(key, eldest.getValue());
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
+ this.loggingEnabled = true;
+ this.serdes = serdes;
+
+ return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
+ }
+
+ public MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ this.listener = listener;
+
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ if (loggingEnabled) {
+ context.register(this, true, new StateRestoreCallback() {
+
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ put(serdes.keyFrom(key), serdes.valueFrom(value));
+ }
+ });
+
+ }
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ this.keys.add(key);
+ }
+
+ @Override
+ public void putAll(List<KeyValue<K, V>> entries) {
+ for (KeyValue<K, V> entry : entries)
+ put(entry.key, entry.value);
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = this.map.remove(key);
+ this.keys.remove(key);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ throw new UnsupportedOperationException("MemoryLRUCache does not support all() function.");
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing since it is in-memory
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
new file mode 100644
index 0000000..99bac93
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
+
+ public MemoryNavigableLRUCache(String name, final int maxCacheSize) {
+ super();
+
+ this.name = name;
+ this.keys = new TreeSet<>();
+
+ // leave room for one extra entry to handle adding an entry before the oldest can be removed
+ this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ if (size() > maxCacheSize) {
+ K key = eldest.getKey();
+ keys.remove(key);
+ if (listener != null) listener.apply(key, eldest.getValue());
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ @Override
+ public MemoryNavigableLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ this.listener = listener;
+
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryNavigableLRUCache.CacheIterator<K, V>(((NavigableSet) this.keys).subSet(from, true, to, false).iterator(), this.map);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryNavigableLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+ }
+
+ private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<K> keys;
+ private final Map<K, V> entries;
+ private K lastKey;
+
+ public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+ this.keys = keys;
+ this.entries = entries;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, V> next() {
+ lastKey = keys.next();
+ return new KeyValue<>(lastKey, entries.get(lastKey));
+ }
+
+ @Override
+ public void remove() {
+ keys.remove();
+ entries.remove(lastKey);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 6dee4c7..fd308c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,25 +17,27 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Serdes;
import java.util.List;
+/**
+ * Metered KeyValueStore wrapper is used for recording operation metrics, and hence its
+ * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
+ *
+ * @param <K>
+ * @param <V>
+ */
public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
- protected final StoreChangeLogger.ValueGetter getter;
- protected final Serdes<K, V> serialization;
protected final String metricScope;
protected final Time time;
@@ -49,27 +51,13 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private Sensor restoreTime;
private StreamsMetrics metrics;
- private boolean loggingEnabled = true;
- private StoreChangeLogger<K, V> changeLogger = null;
-
// always wrap the store with the metered store
- public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
+ public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String metricScope, Time time) {
this.inner = inner;
- this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
- public V get(K key) {
- return inner.get(key);
- }
- };
- this.serialization = serialization;
this.metricScope = metricScope;
this.time = time != null ? time : new SystemTime();
}
- public MeteredKeyValueStore<K, V> disableLogging() {
- loggingEnabled = false;
- return this;
- }
-
@Override
public String name() {
return inner.name();
@@ -88,22 +76,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
- this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
-
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
- inner.init(context);
try {
- final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
- final Deserializer<V> valDeserializer = serialization.valueDeserializer();
-
- context.register(this, loggingEnabled, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
- inner.put(keyDeserializer.deserialize(name, key),
- valDeserializer.deserialize(name, value));
- }
- });
+ inner.init(context);
} finally {
this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
}
@@ -129,11 +105,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.put(key, value);
-
- if (loggingEnabled) {
- changeLogger.add(key);
- changeLogger.maybeLogChange(this.getter);
- }
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
@@ -144,14 +115,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.putAll(entries);
-
- if (loggingEnabled) {
- for (KeyValue<K, V> entry : entries) {
- K key = entry.key;
- changeLogger.add(key);
- }
- changeLogger.maybeLogChange(this.getter);
- }
} finally {
this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
}
@@ -163,27 +126,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
V value = this.inner.delete(key);
- removed(key);
-
return value;
} finally {
this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
}
}
- /**
- * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
- * store.
- *
- * @param key the key for the entry that the inner store removed
- */
- protected void removed(K key) {
- if (loggingEnabled) {
- changeLogger.delete(key);
- changeLogger.maybeLogChange(this.getter);
- }
- }
-
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
@@ -204,9 +152,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.flush();
-
- if (loggingEnabled)
- changeLogger.logChange(this.getter);
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}
@@ -247,7 +192,5 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
}
}
-
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 862c322..33f4c88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -23,45 +23,28 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.state.Serdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
protected final WindowStore<K, V> inner;
- protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
protected final String metricScope;
protected final Time time;
private Sensor putTime;
- private Sensor getTime;
- private Sensor rangeTime;
+ private Sensor fetchTime;
private Sensor flushTime;
private Sensor restoreTime;
private StreamsMetrics metrics;
- private boolean loggingEnabled = true;
- private StoreChangeLogger<byte[], byte[]> changeLogger = null;
-
// always wrap the store with the metered store
public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
this.inner = inner;
- this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
- public byte[] get(byte[] key) {
- return inner.getInternal(key);
- }
- };
this.metricScope = metricScope;
this.time = time != null ? time : new SystemTime();
}
- public MeteredWindowStore<K, V> disableLogging() {
- loggingEnabled = false;
- return this;
- }
-
@Override
public String name() {
return inner.name();
@@ -72,24 +55,14 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
final String name = name();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
- this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
- this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+ this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch");
this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
- this.changeLogger = this.loggingEnabled ?
- new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
-
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
- inner.init(context);
try {
- context.register(this, loggingEnabled, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
- inner.putInternal(key, value);
- }
- });
+ inner.init(context);
} finally {
this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
}
@@ -102,48 +75,26 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
@Override
public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
- return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
+ return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.fetchTime);
}
@Override
public void put(K key, V value) {
- putAndReturnInternalKey(key, value, -1L);
- }
-
- @Override
- public void put(K key, V value, long timestamp) {
- putAndReturnInternalKey(key, value, timestamp);
- }
-
- @Override
- public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
long startNs = time.nanoseconds();
try {
- byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
-
- if (loggingEnabled) {
- changeLogger.add(binKey);
- changeLogger.maybeLogChange(this.getter);
- }
-
- return binKey;
+ this.inner.put(key, value);
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
}
@Override
- public void putInternal(byte[] binaryKey, byte[] binaryValue) {
- inner.putInternal(binaryKey, binaryValue);
- }
-
- @Override
- public byte[] getInternal(byte[] binaryKey) {
+ public void put(K key, V value, long timestamp) {
long startNs = time.nanoseconds();
try {
- return this.inner.getInternal(binaryKey);
+ this.inner.put(key, value, timestamp);
} finally {
- this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+ this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
}
@@ -157,9 +108,6 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
long startNs = time.nanoseconds();
try {
this.inner.flush();
-
- if (loggingEnabled)
- changeLogger.logChange(this.getter);
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index e276f83..853fc5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -100,7 +100,7 @@ public class OffsetCheckpoint {
public Map<TopicPartition, Long> read() throws IOException {
synchronized (lock) {
- BufferedReader reader = null;
+ BufferedReader reader;
try {
reader = new BufferedReader(new FileReader(file));
} catch (FileNotFoundException e) {
@@ -136,8 +136,7 @@ public class OffsetCheckpoint {
throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
}
} finally {
- if (reader != null)
- reader.close();
+ reader.close();
}
}
}
@@ -146,8 +145,7 @@ public class OffsetCheckpoint {
String line = reader.readLine();
if (line == null)
throw new EOFException("File ended prematurely.");
- int val = Integer.parseInt(line);
- return val;
+ return Integer.parseInt(line);
}
public void delete() throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
new file mode 100644
index 0000000..cff9d6b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStoreUtils;
+
+import java.util.Comparator;
+import java.util.TreeSet;
+
+public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> {
+
+ private class ByteArrayComparator implements Comparator<byte[]> {
+ @Override
+ public int compare(byte[] left, byte[] right) {
+ for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+ int a = left[i] & 0xff;
+ int b = right[j] & 0xff;
+
+ if (a != b)
+ return a - b;
+ }
+ return left.length - right.length;
+ }
+ }
+
+ public RawStoreChangeLogger(String topic, ProcessorContext context) {
+ this(topic, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+ }
+
+ public RawStoreChangeLogger(String topic, ProcessorContext context, int maxDirty, int maxRemoved) {
+ super(topic, context, context.id().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
+ init();
+ }
+
+ @Override
+ public void init() {
+ this.dirty = new TreeSet<>(new ByteArrayComparator());
+ this.removed = new TreeSet<>(new ByteArrayComparator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 8c3b437..3a4c351 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.Serdes;
public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
private final String name;
- private final Serdes serdes;
+ private final Serdes<K, V> serdes;
private final Time time;
public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
@@ -47,7 +47,6 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
public StateStore get() {
- return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+ return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes).enableLogging(), "rocksdb-state", time);
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b324ff1..d7e229d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -20,9 +20,11 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Serdes;
+
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompactionStyle;
import org.rocksdb.CompressionType;
@@ -31,18 +33,23 @@ import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import java.io.File;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Set;
public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final int TTL_NOT_USED = -1;
// TODO: these values should be configurable
+ private static final int DEFAULT_UNENCODED_CACHE_SIZE = 1000;
private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
@@ -58,11 +65,31 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private final WriteOptions wOptions;
private final FlushOptions fOptions;
- private Serdes<K, V> serdes;
private ProcessorContext context;
+ private Serdes<K, V> serdes;
protected File dbDir;
private RocksDB db;
+ private boolean loggingEnabled = false;
+ private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;
+
+ private Set<K> cacheDirtyKeys;
+ private MemoryLRUCache<K, RocksDBCacheEntry> cache;
+ private StoreChangeLogger<byte[], byte[]> changeLogger;
+ private StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+
+ public KeyValueStore<K, V> enableLogging() {
+ loggingEnabled = true;
+
+ return this;
+ }
+
+ public RocksDBStore<K, V> withCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+
+ return this;
+ }
+
public RocksDBStore(String name, Serdes<K, V> serdes) {
this.name = name;
this.serdes = serdes;
@@ -88,10 +115,63 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
fOptions.setWaitForFlush(true);
}
+ private class RocksDBCacheEntry {
+ public V value;
+ public boolean isDirty;
+
+ public RocksDBCacheEntry(V value) {
+ this(value, false);
+ }
+
+ public RocksDBCacheEntry(V value, boolean isDirty) {
+ this.value = value;
+ this.isDirty = isDirty;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+
+ this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
+
+ if (this.cacheSize > 0) {
+ this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
+ .whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<K, RocksDBCacheEntry>() {
+ @Override
+ public void apply(K key, RocksDBCacheEntry entry) {
+ // flush all the dirty entries to RocksDB if this evicted entry is dirty
+ if (entry.isDirty) {
+ flush();
+ }
+ }
+ });
+
+
+ this.cacheDirtyKeys = new HashSet<>();
+ } else {
+ this.cache = null;
+ this.cacheDirtyKeys = null;
+ }
+
+ // value getter should always read directly from rocksDB
+ // since it is only for values that are already flushed
+ this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+ @Override
+ public byte[] get(byte[] key) {
+ return getInternal(key);
+ }
+ };
+
+ context.register(this, loggingEnabled, new StateRestoreCallback() {
+
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ putInternal(key, value);
+ }
+ });
}
private RocksDB openDB(File dir, Options options, int ttl) {
@@ -100,12 +180,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
dir.getParentFile().mkdirs();
return RocksDB.open(options, dir.toString());
} else {
- throw new ProcessorStateException("Change log is not supported for store " + this.name + " since it is TTL based.");
+ throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
// TODO: support TTL with change log?
// return TtlDB.open(options, dir.toString(), ttl, false);
}
} catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
}
}
@@ -122,25 +201,64 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public V get(K key) {
+ if (cache != null) {
+ RocksDBCacheEntry entry = cache.get(key);
+
+ if (entry == null) {
+ byte[] rawKey = serdes.rawKey(key);
+ V value = serdes.valueFrom(getInternal(serdes.rawKey(key)));
+ cache.put(key, new RocksDBCacheEntry(value));
+
+ return value;
+ } else {
+ return entry.value;
+ }
+ } else {
+ return serdes.valueFrom(getInternal(serdes.rawKey(key)));
+ }
+ }
+
+ private byte[] getInternal(byte[] rawKey) {
try {
- return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+ return this.db.get(rawKey);
} catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new ProcessorStateException("Error while executing get " + key.toString() + " from store " + this.name, e);
+ throw new ProcessorStateException("Error while getting value for key " + serdes.keyFrom(rawKey) +
+ " from store " + this.name, e);
}
}
@Override
public void put(K key, V value) {
- try {
- if (value == null) {
- db.remove(wOptions, serdes.rawKey(key));
- } else {
- db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+ if (cache != null) {
+ cache.put(key, new RocksDBCacheEntry(value, true));
+ cacheDirtyKeys.add(key);
+ } else {
+ byte[] rawKey = serdes.rawKey(key);
+ byte[] rawValue = serdes.rawValue(value);
+ putInternal(rawKey, rawValue);
+ }
+ }
+
+ private void putInternal(byte[] rawKey, byte[] rawValue) {
+ if (rawValue == null) {
+ try {
+ db.remove(wOptions, rawKey);
+ } catch (RocksDBException e) {
+ throw new ProcessorStateException("Error while removing key " + serdes.keyFrom(rawKey) +
+ " from store " + this.name, e);
}
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new ProcessorStateException("Error while executing put " + key.toString() + " from store " + this.name, e);
+ } else {
+ try {
+ db.put(wOptions, rawKey, rawValue);
+ } catch (RocksDBException e) {
+ throw new ProcessorStateException("Error while executing put key " + serdes.keyFrom(rawKey) +
+ " and value " + serdes.keyFrom(rawValue) + " from store " + this.name, e);
+ }
+ }
+
+ if (loggingEnabled) {
+ changeLogger.add(rawKey);
+ changeLogger.maybeLogChange(this.getter);
}
}
@@ -150,6 +268,21 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
put(entry.key, entry.value);
}
+ // this function is only called in flush()
+ private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
+ WriteBatch batch = new WriteBatch();
+
+ for (KeyValue<byte[], byte[]> entry : entries) {
+ batch.put(entry.key, entry.value);
+ }
+
+ try {
+ db.write(wOptions, batch);
+ } catch (RocksDBException e) {
+ throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
+ }
+ }
+
@Override
public V delete(K key) {
V value = get(key);
@@ -159,11 +292,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public KeyValueIterator<K, V> range(K from, K to) {
+ // we need to flush the cache if necessary before returning the iterator
+ if (cache != null)
+ flush();
+
return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
}
@Override
public KeyValueIterator<K, V> all() {
+ // we need to flush the cache if necessary before returning the iterator
+ if (cache != null)
+ flush();
+
RocksIterator innerIter = db.newIterator();
innerIter.seekToFirst();
return new RocksDbIterator<K, V>(innerIter, serdes);
@@ -171,10 +312,60 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public void flush() {
+ // flush of the cache entries if necessary
+ if (cache != null) {
+ List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size());
+ List<byte[]> deleteBatch = new ArrayList<>(cache.keys.size());
+
+ for (K key : cacheDirtyKeys) {
+ RocksDBCacheEntry entry = cache.get(key);
+
+ assert entry.isDirty;
+
+ byte[] rawKey = serdes.rawKey(key);
+
+ if (entry.value != null) {
+ putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value)));
+ } else {
+ deleteBatch.add(rawKey);
+ }
+ }
+
+ putAllInternal(putBatch);
+
+ if (loggingEnabled) {
+ for (KeyValue<byte[], byte[]> kv : putBatch)
+ changeLogger.add(kv.key);
+ }
+
+ // check all removed entries and remove them in rocksDB
+ // TODO: can this be done in batch as well?
+ for (byte[] removedKey : deleteBatch) {
+ try {
+ db.remove(wOptions, removedKey);
+ } catch (RocksDBException e) {
+ throw new ProcessorStateException("Error while deleting with key " + serdes.keyFrom(removedKey) + " from store " + this.name, e);
+ }
+
+ if (loggingEnabled) {
+ changeLogger.delete(removedKey);
+ }
+ }
+
+ // reset dirty set
+ cacheDirtyKeys.clear();
+ }
+
+ flushInternal();
+
+ if (loggingEnabled)
+ changeLogger.logChange(getter);
+ }
+
+ public void flushInternal() {
try {
db.flush(fOptions);
} catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
}
}