You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/21 01:10:50 UTC
[4/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add
Reduce functions
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
deleted file mode 100644
index 743a110..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.Time;
-
-import java.util.List;
-
-public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
-
- protected final KeyValueStore<K, V> inner;
- protected final StoreChangeLogger.ValueGetter getter;
- protected final Serdes<K, V> serialization;
- protected final String metricScope;
- protected final Time time;
-
- private Sensor putTime;
- private Sensor getTime;
- private Sensor deleteTime;
- private Sensor putAllTime;
- private Sensor allTime;
- private Sensor rangeTime;
- private Sensor flushTime;
- private Sensor restoreTime;
- private StreamingMetrics metrics;
-
- private boolean loggingEnabled = true;
- private StoreChangeLogger<K, V> changeLogger = null;
-
- // always wrap the store with the metered store
- public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
- this.inner = inner;
- this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
- public V get(K key) {
- return inner.get(key);
- }
- };
- this.serialization = serialization;
- this.metricScope = metricScope;
- this.time = time != null ? time : new SystemTime();
- }
-
- public MeteredKeyValueStore<K, V> disableLogging() {
- loggingEnabled = false;
- return this;
- }
-
- @Override
- public String name() {
- return inner.name();
- }
-
- @Override
- public void init(ProcessorContext context) {
- final String name = name();
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
- this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
- this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
- this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
- this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
- this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
- this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
- this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
-
- serialization.init(context);
- this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
-
- // register and possibly restore the state from the logs
- long startNs = time.nanoseconds();
- inner.init(context);
- try {
- final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
- final Deserializer<V> valDeserializer = serialization.valueDeserializer();
-
- context.register(this, loggingEnabled, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
- inner.put(keyDeserializer.deserialize(name, key),
- valDeserializer.deserialize(name, value));
- }
- });
- } finally {
- this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public boolean persistent() {
- return inner.persistent();
- }
-
- @Override
- public V get(K key) {
- long startNs = time.nanoseconds();
- try {
- return this.inner.get(key);
- } finally {
- this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void put(K key, V value) {
- long startNs = time.nanoseconds();
- try {
- this.inner.put(key, value);
-
- if (loggingEnabled) {
- changeLogger.add(key);
- changeLogger.maybeLogChange(this.getter);
- }
- } finally {
- this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- long startNs = time.nanoseconds();
- try {
- this.inner.putAll(entries);
-
- if (loggingEnabled) {
- for (Entry<K, V> entry : entries) {
- K key = entry.key();
- changeLogger.add(key);
- }
- changeLogger.maybeLogChange(this.getter);
- }
- } finally {
- this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public V delete(K key) {
- long startNs = time.nanoseconds();
- try {
- V value = this.inner.delete(key);
-
- removed(key);
-
- return value;
- } finally {
- this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
- }
- }
-
- /**
- * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
- * store.
- *
- * @param key the key for the entry that the inner store removed
- */
- protected void removed(K key) {
- if (loggingEnabled) {
- changeLogger.delete(key);
- changeLogger.maybeLogChange(this.getter);
- }
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
- }
-
- @Override
- public void close() {
- inner.close();
- }
-
- @Override
- public void flush() {
- long startNs = time.nanoseconds();
- try {
- this.inner.flush();
-
- if (loggingEnabled)
- changeLogger.logChange(this.getter);
- } finally {
- this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
- }
- }
-
- private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
-
- private final KeyValueIterator<K1, V1> iter;
- private final Sensor sensor;
- private final long startNs;
-
- public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
- this.iter = iter;
- this.sensor = sensor;
- this.startNs = time.nanoseconds();
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K1, V1> next() {
- return iter.next();
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- try {
- iter.close();
- } finally {
- metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
deleted file mode 100644
index cfcfb00..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-
-public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
-
- protected final WindowStore<K, V> inner;
- protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
- protected final String metricScope;
- protected final Time time;
-
- private Sensor putTime;
- private Sensor getTime;
- private Sensor rangeTime;
- private Sensor flushTime;
- private Sensor restoreTime;
- private StreamingMetrics metrics;
-
- private boolean loggingEnabled = true;
- private StoreChangeLogger<byte[], byte[]> changeLogger = null;
-
- // always wrap the store with the metered store
- public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
- this.inner = inner;
- this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
- public byte[] get(byte[] key) {
- return inner.getInternal(key);
- }
- };
- this.metricScope = metricScope;
- this.time = time != null ? time : new SystemTime();
- }
-
- public MeteredWindowStore<K, V> disableLogging() {
- loggingEnabled = false;
- return this;
- }
-
- @Override
- public String name() {
- return inner.name();
- }
-
- @Override
- public void init(ProcessorContext context) {
- final String name = name();
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
- this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
- this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
- this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
- this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
-
- this.changeLogger = this.loggingEnabled ?
- new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
-
- // register and possibly restore the state from the logs
- long startNs = time.nanoseconds();
- inner.init(context);
- try {
- context.register(this, loggingEnabled, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
- inner.putInternal(key, value);
- }
- });
- } finally {
- this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public boolean persistent() {
- return inner.persistent();
- }
-
- @Override
- public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
- return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
- }
-
- @Override
- public void put(K key, V value) {
- putAndReturnInternalKey(key, value, -1L);
- }
-
- @Override
- public void put(K key, V value, long timestamp) {
- putAndReturnInternalKey(key, value, timestamp);
- }
-
- @Override
- public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
- long startNs = time.nanoseconds();
- try {
- byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
-
- if (loggingEnabled) {
- changeLogger.add(binKey);
- changeLogger.maybeLogChange(this.getter);
- }
-
- return binKey;
- } finally {
- this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void putInternal(byte[] binaryKey, byte[] binaryValue) {
- inner.putInternal(binaryKey, binaryValue);
- }
-
- @Override
- public byte[] getInternal(byte[] binaryKey) {
- long startNs = time.nanoseconds();
- try {
- return this.inner.getInternal(binaryKey);
- } finally {
- this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void close() {
- inner.close();
- }
-
- @Override
- public void flush() {
- long startNs = time.nanoseconds();
- try {
- this.inner.flush();
-
- if (loggingEnabled)
- changeLogger.logChange(this.getter);
- } finally {
- this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
- }
- }
-
- private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
-
- private final WindowStoreIterator<E> iter;
- private final Sensor sensor;
- private final long startNs;
-
- public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
- this.iter = iter;
- this.sensor = sensor;
- this.startNs = time.nanoseconds();
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public KeyValue<Long, E> next() {
- return iter.next();
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- try {
- iter.close();
- } finally {
- metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
- }
- }
-
- }
-
- WindowStore<K, V> inner() {
- return inner;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
deleted file mode 100644
index d748aac..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following:
- * <pre>
- * <version>
- * <n>
- * <topic_name_1> <partition_1> <offset_1>
- * .
- * .
- * .
- * <topic_name_n> <partition_n> <offset_n>
- * </pre>
- * The first line contains a number designating the format version (currently 0), the get line contains
- * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
- * separated by spaces.
- */
-public class OffsetCheckpoint {
-
- private static final int VERSION = 0;
-
- private final File file;
- private final Object lock;
-
- public OffsetCheckpoint(File file) throws IOException {
- this.file = file;
- this.lock = new Object();
- }
-
- public void write(Map<TopicPartition, Long> offsets) throws IOException {
- synchronized (lock) {
- // write to temp file and then swap with the existing file
- File temp = new File(file.getAbsolutePath() + ".tmp");
-
- FileOutputStream fileOutputStream = new FileOutputStream(temp);
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
- try {
- writeIntLine(writer, VERSION);
- writeIntLine(writer, offsets.size());
-
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
- writeEntry(writer, entry.getKey(), entry.getValue());
-
- writer.flush();
- fileOutputStream.getFD().sync();
- } finally {
- writer.close();
- }
-
- Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
- }
- }
-
- private void writeIntLine(BufferedWriter writer, int number) throws IOException {
- writer.write(Integer.toString(number));
- writer.newLine();
- }
-
- private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
- writer.write(part.topic());
- writer.write(' ');
- writer.write(Integer.toString(part.partition()));
- writer.write(' ');
- writer.write(Long.toString(offset));
- writer.newLine();
- }
-
- public Map<TopicPartition, Long> read() throws IOException {
- synchronized (lock) {
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new FileReader(file));
- } catch (FileNotFoundException e) {
- return Collections.emptyMap();
- }
-
- try {
- int version = readInt(reader);
- switch (version) {
- case 0:
- int expectedSize = readInt(reader);
- Map<TopicPartition, Long> offsets = new HashMap<>();
- String line = reader.readLine();
- while (line != null) {
- String[] pieces = line.split("\\s+");
- if (pieces.length != 3)
- throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
- line));
-
- String topic = pieces[0];
- int partition = Integer.parseInt(pieces[1]);
- long offset = Long.parseLong(pieces[2]);
- offsets.put(new TopicPartition(topic, partition), offset);
- line = reader.readLine();
- }
- if (offsets.size() != expectedSize)
- throw new IOException(String.format("Expected %d entries but found only %d",
- expectedSize,
- offsets.size()));
- return offsets;
-
- default:
- throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
- }
- } finally {
- if (reader != null)
- reader.close();
- }
- }
- }
-
- private int readInt(BufferedReader reader) throws IOException {
- String line = reader.readLine();
- if (line == null)
- throw new EOFException("File ended prematurely.");
- int val = Integer.parseInt(line);
- return val;
- }
-
- public void delete() throws IOException {
- file.delete();
- }
-
- @Override
- public String toString() {
- return this.file.getAbsolutePath();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
deleted file mode 100644
index 41314b9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String)
- */
-public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final Serdes serdes;
- private final Time time;
-
- protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
deleted file mode 100644
index 62b9f2c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
- private static final int TTL_NOT_USED = -1;
-
- // TODO: these values should be configurable
- private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
- private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
- private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
- private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
- private static final long BLOCK_SIZE = 4096L;
- private static final int TTL_SECONDS = TTL_NOT_USED;
- private static final int MAX_WRITE_BUFFERS = 3;
- private static final String DB_FILE_DIR = "rocksdb";
-
- private final String name;
-
- private final Options options;
- private final WriteOptions wOptions;
- private final FlushOptions fOptions;
-
- private Serdes<K, V> serdes;
- private ProcessorContext context;
- protected File dbDir;
- private RocksDB db;
-
- public RocksDBStore(String name, Serdes<K, V> serdes) {
- this.name = name;
- this.serdes = serdes;
-
- // initialize the rocksdb options
- BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
- tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
- tableConfig.setBlockSize(BLOCK_SIZE);
-
- options = new Options();
- options.setTableFormatConfig(tableConfig);
- options.setWriteBufferSize(WRITE_BUFFER_SIZE);
- options.setCompressionType(COMPRESSION_TYPE);
- options.setCompactionStyle(COMPACTION_STYLE);
- options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
- options.setCreateIfMissing(true);
- options.setErrorIfExists(false);
-
- wOptions = new WriteOptions();
- wOptions.setDisableWAL(true);
-
- fOptions = new FlushOptions();
- fOptions.setWaitForFlush(true);
- }
-
- public void init(ProcessorContext context) {
- serdes.init(context);
-
- this.context = context;
- this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
- this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
- }
-
- private RocksDB openDB(File dir, Options options, int ttl) {
- try {
- if (ttl == TTL_NOT_USED) {
- dir.getParentFile().mkdirs();
- return RocksDB.open(options, dir.toString());
- } else {
- throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
- // TODO: support TTL with change log?
- // return TtlDB.open(options, dir.toString(), ttl, false);
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
- }
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- try {
- return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
- }
- }
-
- @Override
- public void put(K key, V value) {
- try {
- if (value == null) {
- db.remove(wOptions, serdes.rawKey(key));
- } else {
- db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = get(key);
- put(key, null);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- RocksIterator innerIter = db.newIterator();
- innerIter.seekToFirst();
- return new RocksDbIterator<K, V>(innerIter, serdes);
- }
-
- @Override
- public void flush() {
- try {
- db.flush(fOptions);
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing flush from store " + this.name, e);
- }
- }
-
- @Override
- public void close() {
- flush();
- db.close();
- }
-
- private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
- private final RocksIterator iter;
- private final Serdes<K, V> serdes;
-
- public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
- this.iter = iter;
- this.serdes = serdes;
- }
-
- protected byte[] peekRawKey() {
- return iter.key();
- }
-
- protected Entry<K, V> getEntry() {
- return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
- }
-
- @Override
- public boolean hasNext() {
- return iter.isValid();
- }
-
- @Override
- public Entry<K, V> next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- Entry<K, V> entry = this.getEntry();
- iter.next();
- return entry;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("RocksDB iterator does not support remove");
- }
-
- @Override
- public void close() {
- iter.dispose();
- }
-
- }
-
- private static class LexicographicComparator implements Comparator<byte[]> {
-
- @Override
- public int compare(byte[] left, byte[] right) {
- for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
- int leftByte = left[i] & 0xff;
- int rightByte = right[j] & 0xff;
- if (leftByte != rightByte) {
- return leftByte - rightByte;
- }
- }
- return left.length - right.length;
- }
- }
-
- private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
- // RocksDB's JNI interface does not expose getters/setters that allow the
- // comparator to be pluggable, and the default is lexicographic, so it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator = new LexicographicComparator();
- byte[] rawToKey;
-
- public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
- K from, K to) {
- super(iter, serdes);
- iter.seek(serdes.rawKey(from));
- this.rawToKey = serdes.rawKey(to);
- }
-
- @Override
- public boolean hasNext() {
- return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
deleted file mode 100644
index 2f30712..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.SimpleTimeZone;
-
-public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
-
- public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
-
- private static final long USE_CURRENT_TIMESTAMP = -1L;
-
- private static class Segment extends RocksDBStore<byte[], byte[]> {
- public final long id;
-
- Segment(String name, long id) {
- super(name, WindowStoreUtil.INNER_SERDES);
- this.id = id;
- }
-
- public void destroy() {
- Utils.delete(dbDir);
- }
- }
-
- private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
- private final Serdes<?, V> serdes;
- private final KeyValueIterator<byte[], byte[]>[] iterators;
- private int index = 0;
-
- RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
- this(serdes, WindowStoreUtil.NO_ITERATORS);
- }
-
- RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
- this.serdes = serdes;
- this.iterators = iterators;
- }
-
- @Override
- public boolean hasNext() {
- while (index < iterators.length) {
- if (iterators[index].hasNext())
- return true;
-
- index++;
- }
- return false;
- }
-
- @Override
- public KeyValue<Long, V> next() {
- if (index >= iterators.length)
- throw new NoSuchElementException();
-
- Entry<byte[], byte[]> entry = iterators[index].next();
-
- return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
- serdes.valueFrom(entry.value()));
- }
-
- @Override
- public void remove() {
- if (index < iterators.length)
- iterators[index].remove();
- }
-
- @Override
- public void close() {
- for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
- iterator.close();
- }
- }
- }
-
- private final String name;
- private final long segmentInterval;
- private final boolean retainDuplicates;
- private final Segment[] segments;
- private final Serdes<K, V> serdes;
- private final SimpleDateFormat formatter;
-
- private ProcessorContext context;
- private long currentSegmentId = -1L;
- private int seqnum = 0;
-
- public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
- this.name = name;
-
- // The segment interval must be greater than MIN_SEGMENT_INTERVAL
- this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
-
- this.segments = new Segment[numSegments];
- this.serdes = serdes;
-
- this.retainDuplicates = retainDuplicates;
-
- // Create a date formatter. Formatted timestamps are used as segment name suffixes
- this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
- this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- this.context = context;
- }
-
- @Override
- public boolean persistent() {
- return true;
- }
-
- @Override
- public void flush() {
- for (Segment segment : segments) {
- if (segment != null)
- segment.flush();
- }
- }
-
- @Override
- public void close() {
- for (Segment segment : segments) {
- if (segment != null)
- segment.close();
- }
- }
-
- @Override
- public void put(K key, V value) {
- putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
- }
-
- @Override
- public void put(K key, V value, long timestamp) {
- putAndReturnInternalKey(key, value, timestamp);
- }
-
- @Override
- public byte[] putAndReturnInternalKey(K key, V value, long t) {
- long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
-
- long segmentId = segmentId(timestamp);
-
- if (segmentId > currentSegmentId) {
- // A new segment will be created. Clean up old segments first.
- currentSegmentId = segmentId;
- cleanup();
- }
-
- // If the record is within the retention period, put it in the store.
- if (segmentId > currentSegmentId - segments.length) {
- if (retainDuplicates)
- seqnum = (seqnum + 1) & 0x7FFFFFFF;
- byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
- getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
- return binaryKey;
- } else {
- return null;
- }
- }
-
- @Override
- public void putInternal(byte[] binaryKey, byte[] binaryValue) {
- long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
-
- if (segmentId > currentSegmentId) {
- // A new segment will be created. Clean up old segments first.
- currentSegmentId = segmentId;
- cleanup();
- }
-
- // If the record is within the retention period, put it in the store.
- if (segmentId > currentSegmentId - segments.length)
- getSegment(segmentId).put(binaryKey, binaryValue);
- }
-
- @Override
- public byte[] getInternal(byte[] binaryKey) {
- long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
-
- Segment segment = segments[(int) (segmentId % segments.length)];
-
- if (segment != null && segment.id == segmentId) {
- return segment.get(binaryKey);
- } else {
- return null;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
- long segFrom = segmentId(timeFrom);
- long segTo = segmentId(Math.max(0L, timeTo));
-
- byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
- byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
-
- ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
-
- for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
- Segment segment = segments[(int) (segmentId % segments.length)];
-
- if (segment != null && segment.id == segmentId)
- iterators.add(segment.range(binaryFrom, binaryUntil));
- }
-
- if (iterators.size() > 0) {
- return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
- } else {
- return new RocksDBWindowStoreIterator<>(serdes);
- }
- }
-
- private Segment getSegment(long segmentId) {
- int index = (int) (segmentId % segments.length);
-
- if (segments[index] == null) {
- segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
- segments[index].init(context);
- }
-
- return segments[index];
- }
-
- private void cleanup() {
- for (int i = 0; i < segments.length; i++) {
- if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
- segments[i].close();
- segments[i].destroy();
- segments[i] = null;
- }
- }
- }
-
- public long segmentId(long timestamp) {
- return timestamp / segmentInterval;
- }
-
- public String directorySuffix(long segmentId) {
- return formatter.format(new Date(segmentId * segmentInterval));
- }
-
- // this method is used by a test
- public Set<Long> segmentIds() {
- HashSet<Long> segmentIds = new HashSet<>();
-
- for (Segment segment : segments) {
- if (segment != null)
- segmentIds.add(segment.id);
- }
-
- return segmentIds;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
deleted file mode 100644
index fcdcb9b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String)
- */
-public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final long retentionPeriod;
- private final boolean retainDuplicates;
- private final int numSegments;
- private final Serdes serdes;
- private final Time time;
-
- public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.retentionPeriod = retentionPeriod;
- this.retainDuplicates = retainDuplicates;
- this.numSegments = numSegments;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
deleted file mode 100644
index ee6624e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class StoreChangeLogger<K, V> {
-
- public interface ValueGetter<K, V> {
- V get(K key);
- }
-
- protected final Serdes<K, V> serialization;
-
- private final Set<K> dirty;
- private final Set<K> removed;
- private final int maxDirty;
- private final int maxRemoved;
-
- private final String topic;
- private int partition;
- private ProcessorContext context;
-
- // always wrap the logged store with the metered store
- public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
- this.topic = topic;
- this.serialization = serialization;
- this.context = context;
- this.partition = context.id().partition;
-
- this.dirty = new HashSet<>();
- this.removed = new HashSet<>();
- this.maxDirty = 100; // TODO: this needs to be configurable
- this.maxRemoved = 100; // TODO: this needs to be configurable
- }
-
- public void add(K key) {
- this.dirty.add(key);
- this.removed.remove(key);
- }
-
- public void delete(K key) {
- this.dirty.remove(key);
- this.removed.add(key);
- }
-
- public void maybeLogChange(ValueGetter<K, V> getter) {
- if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
- logChange(getter);
- }
-
- public void logChange(ValueGetter<K, V> getter) {
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- if (collector != null) {
- Serializer<K> keySerializer = serialization.keySerializer();
- Serializer<V> valueSerializer = serialization.valueSerializer();
-
- for (K k : this.removed) {
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
- }
- for (K k : this.dirty) {
- V v = getter.get(k);
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
- }
- this.removed.clear();
- this.dirty.clear();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 5452040..46b2592 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -27,6 +27,9 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
/**
* Factory for creating key-value stores.
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
new file mode 100644
index 0000000..286db1b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes serdes;
+ private final Time time;
+
+ public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+ }
+
+ private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final NavigableMap<K, V> map;
+
+ public MemoryStore(String name) {
+ super();
+ this.name = name;
+ this.map = new TreeMap<>();
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ return this.map.remove(key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<Map.Entry<K, V>> iter;
+
+ public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ Map.Entry<K, V> entry = iter.next();
+ return new Entry<>(entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
new file mode 100644
index 0000000..6a38423
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final int capacity;
+ private final Serdes serdes;
+ private final Time time;
+
+ public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.capacity = capacity;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+ final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
+ cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+ @Override
+ public void apply(K key, V value) {
+ store.removed(key);
+ }
+ });
+ return store;
+ }
+
+ private static interface EldestEntryRemovalListener<K, V> {
+ public void apply(K key, V value);
+ }
+
+ protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final Map<K, V> map;
+ private final NavigableSet<K> keys;
+ private EldestEntryRemovalListener<K, V> listener;
+
+ public MemoryLRUCache(String name, final int maxCacheSize) {
+ this.name = name;
+ this.keys = new TreeSet<>();
+ // leave room for one extra entry to handle adding an entry before the oldest can be removed
+ this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ if (size() > maxCacheSize) {
+ K key = eldest.getKey();
+ keys.remove(key);
+ if (listener != null) listener.apply(key, eldest.getValue());
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ this.keys.add(key);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = this.map.remove(key);
+ this.keys.remove(key);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<K> keys;
+ private final Map<K, V> entries;
+ private K lastKey;
+
+ public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+ this.keys = keys;
+ this.entries = entries;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ lastKey = keys.next();
+ return new Entry<>(lastKey, entries.get(lastKey));
+ }
+
+ @Override
+ public void remove() {
+ keys.remove();
+ entries.remove(lastKey);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
new file mode 100644
index 0000000..21f73b0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.List;
+
+public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
+
+ protected final KeyValueStore<K, V> inner;
+ protected final StoreChangeLogger.ValueGetter getter;
+ protected final Serdes<K, V> serialization;
+ protected final String metricScope;
+ protected final Time time;
+
+ private Sensor putTime;
+ private Sensor getTime;
+ private Sensor deleteTime;
+ private Sensor putAllTime;
+ private Sensor allTime;
+ private Sensor rangeTime;
+ private Sensor flushTime;
+ private Sensor restoreTime;
+ private StreamingMetrics metrics;
+
+ private boolean loggingEnabled = true;
+ private StoreChangeLogger<K, V> changeLogger = null;
+
+ // always wrap the store with the metered store
+ public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
+ this.inner = inner;
+ this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
+ public V get(K key) {
+ return inner.get(key);
+ }
+ };
+ this.serialization = serialization;
+ this.metricScope = metricScope;
+ this.time = time != null ? time : new SystemTime();
+ }
+
+ public MeteredKeyValueStore<K, V> disableLogging() {
+ loggingEnabled = false;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ final String name = name();
+ this.metrics = context.metrics();
+ this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+ this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+ this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
+ this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
+ this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
+ this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+ this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+ this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+ serialization.init(context);
+ this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
+
+ // register and possibly restore the state from the logs
+ long startNs = time.nanoseconds();
+ inner.init(context);
+ try {
+ final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
+ final Deserializer<V> valDeserializer = serialization.valueDeserializer();
+
+ context.register(this, loggingEnabled, new StateRestoreCallback() {
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ inner.put(keyDeserializer.deserialize(name, key),
+ valDeserializer.deserialize(name, value));
+ }
+ });
+ } finally {
+ this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public V get(K key) {
+ long startNs = time.nanoseconds();
+ try {
+ return this.inner.get(key);
+ } finally {
+ this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.put(key, value);
+
+ if (loggingEnabled) {
+ changeLogger.add(key);
+ changeLogger.maybeLogChange(this.getter);
+ }
+ } finally {
+ this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.putAll(entries);
+
+ if (loggingEnabled) {
+ for (Entry<K, V> entry : entries) {
+ K key = entry.key();
+ changeLogger.add(key);
+ }
+ changeLogger.maybeLogChange(this.getter);
+ }
+ } finally {
+ this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public V delete(K key) {
+ long startNs = time.nanoseconds();
+ try {
+ V value = this.inner.delete(key);
+
+ removed(key);
+
+ return value;
+ } finally {
+ this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
+ }
+ }
+
+ /**
+ * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
+ * store.
+ *
+ * @param key the key for the entry that the inner store removed
+ */
+ protected void removed(K key) {
+ if (loggingEnabled) {
+ changeLogger.delete(key);
+ changeLogger.maybeLogChange(this.getter);
+ }
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public void flush() {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.flush();
+
+ if (loggingEnabled)
+ changeLogger.logChange(this.getter);
+ } finally {
+ this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+ }
+ }
+
+ private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
+
+ private final KeyValueIterator<K1, V1> iter;
+ private final Sensor sensor;
+ private final long startNs;
+
+ public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<K1, V1> next() {
+ return iter.next();
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
new file mode 100644
index 0000000..821927d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
+
+ protected final WindowStore<K, V> inner;
+ protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+ protected final String metricScope;
+ protected final Time time;
+
+ private Sensor putTime;
+ private Sensor getTime;
+ private Sensor rangeTime;
+ private Sensor flushTime;
+ private Sensor restoreTime;
+ private StreamingMetrics metrics;
+
+ private boolean loggingEnabled = true;
+ private StoreChangeLogger<byte[], byte[]> changeLogger = null;
+
+ // always wrap the store with the metered store
+ public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
+ this.inner = inner;
+ this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+ public byte[] get(byte[] key) {
+ return inner.getInternal(key);
+ }
+ };
+ this.metricScope = metricScope;
+ this.time = time != null ? time : new SystemTime();
+ }
+
+ public MeteredWindowStore<K, V> disableLogging() {
+ loggingEnabled = false;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ final String name = name();
+ this.metrics = context.metrics();
+ this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+ this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+ this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+ this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+ this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+ this.changeLogger = this.loggingEnabled ?
+ new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
+
+ // register and possibly restore the state from the logs
+ long startNs = time.nanoseconds();
+ inner.init(context);
+ try {
+ context.register(this, loggingEnabled, new StateRestoreCallback() {
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ inner.putInternal(key, value);
+ }
+ });
+ } finally {
+ this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+ return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ putAndReturnInternalKey(key, value, -1L);
+ }
+
+ @Override
+ public void put(K key, V value, long timestamp) {
+ putAndReturnInternalKey(key, value, timestamp);
+ }
+
+ @Override
+ public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
+ long startNs = time.nanoseconds();
+ try {
+ byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
+
+ if (loggingEnabled) {
+ changeLogger.add(binKey);
+ changeLogger.maybeLogChange(this.getter);
+ }
+
+ return binKey;
+ } finally {
+ this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+ inner.putInternal(binaryKey, binaryValue);
+ }
+
+ @Override
+ public byte[] getInternal(byte[] binaryKey) {
+ long startNs = time.nanoseconds();
+ try {
+ return this.inner.getInternal(binaryKey);
+ } finally {
+ this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public void flush() {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.flush();
+
+ if (loggingEnabled)
+ changeLogger.logChange(this.getter);
+ } finally {
+ this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+ }
+ }
+
+ private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
+
+ private final WindowStoreIterator<E> iter;
+ private final Sensor sensor;
+ private final long startNs;
+
+ public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<Long, E> next() {
+ return iter.next();
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+ }
+ }
+
+ }
+
+ WindowStore<K, V> inner() {
+ return inner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
new file mode 100644
index 0000000..e276f83
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following:
+ * <pre>
+ * <version>
+ * <n>
+ * <topic_name_1> <partition_1> <offset_1>
+ * .
+ * .
+ * .
+ * <topic_name_n> <partition_n> <offset_n>
+ * </pre>
+ * The first line contains a number designating the format version (currently 0), the get line contains
+ * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
+ * separated by spaces.
+ */
+public class OffsetCheckpoint {
+
+ private static final int VERSION = 0;
+
+ private final File file;
+ private final Object lock;
+
+ public OffsetCheckpoint(File file) throws IOException {
+ this.file = file;
+ this.lock = new Object();
+ }
+
+ public void write(Map<TopicPartition, Long> offsets) throws IOException {
+ synchronized (lock) {
+ // write to temp file and then swap with the existing file
+ File temp = new File(file.getAbsolutePath() + ".tmp");
+
+ FileOutputStream fileOutputStream = new FileOutputStream(temp);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
+ try {
+ writeIntLine(writer, VERSION);
+ writeIntLine(writer, offsets.size());
+
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+ writeEntry(writer, entry.getKey(), entry.getValue());
+
+ writer.flush();
+ fileOutputStream.getFD().sync();
+ } finally {
+ writer.close();
+ }
+
+ Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
+ }
+ }
+
+ private void writeIntLine(BufferedWriter writer, int number) throws IOException {
+ writer.write(Integer.toString(number));
+ writer.newLine();
+ }
+
+ private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
+ writer.write(part.topic());
+ writer.write(' ');
+ writer.write(Integer.toString(part.partition()));
+ writer.write(' ');
+ writer.write(Long.toString(offset));
+ writer.newLine();
+ }
+
+ public Map<TopicPartition, Long> read() throws IOException {
+ synchronized (lock) {
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+ } catch (FileNotFoundException e) {
+ return Collections.emptyMap();
+ }
+
+ try {
+ int version = readInt(reader);
+ switch (version) {
+ case 0:
+ int expectedSize = readInt(reader);
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ String line = reader.readLine();
+ while (line != null) {
+ String[] pieces = line.split("\\s+");
+ if (pieces.length != 3)
+ throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
+ line));
+
+ String topic = pieces[0];
+ int partition = Integer.parseInt(pieces[1]);
+ long offset = Long.parseLong(pieces[2]);
+ offsets.put(new TopicPartition(topic, partition), offset);
+ line = reader.readLine();
+ }
+ if (offsets.size() != expectedSize)
+ throw new IOException(String.format("Expected %d entries but found only %d",
+ expectedSize,
+ offsets.size()));
+ return offsets;
+
+ default:
+ throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
+ }
+ } finally {
+ if (reader != null)
+ reader.close();
+ }
+ }
+ }
+
+ private int readInt(BufferedReader reader) throws IOException {
+ String line = reader.readLine();
+ if (line == null)
+ throw new EOFException("File ended prematurely.");
+ int val = Integer.parseInt(line);
+ return val;
+ }
+
+ public void delete() throws IOException {
+ file.delete();
+ }
+
+ @Override
+ public String toString() {
+ return this.file.getAbsolutePath();
+ }
+
+}