You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/05 01:47:22 UTC
[2/2] kafka git commit: KAFKA-3016: phase-1. A local store for join
window
KAFKA-3016: phase-1. A local store for join window
guozhangwang
An implementation of local store for join window. This implementation uses "rolling" of RocksDB instances for timestamp based truncation.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #726 from ymatsuda/windowed_join
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b0b3e5ae
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b0b3e5ae
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b0b3e5ae
Branch: refs/heads/trunk
Commit: b0b3e5aebf381faf81bd9454ef7b448e2ad922c7
Parents: 57df460
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Jan 4 16:47:17 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Jan 4 16:47:17 2016 -0800
----------------------------------------------------------------------
.../state/KeyValueStoreChangeLogger.java | 87 ---
.../streams/state/MeteredKeyValueStore.java | 18 +-
.../kafka/streams/state/MeteredWindowStore.java | 200 ++++++
.../kafka/streams/state/RocksDBStore.java | 22 +-
.../kafka/streams/state/RocksDBWindowStore.java | 283 ++++++++
.../state/RocksDBWindowStoreSupplier.java | 60 ++
.../kafka/streams/state/StoreChangeLogger.java | 91 +++
.../apache/kafka/streams/state/WindowStore.java | 36 +
.../streams/state/WindowStoreIterator.java | 26 +
.../kafka/streams/state/WindowStoreUtil.java | 55 ++
.../streams/state/KeyValueStoreTestDriver.java | 22 +-
.../streams/state/RocksDBWindowStoreTest.java | 672 +++++++++++++++++++
.../apache/kafka/streams/state/StateUtils.java | 28 +-
.../apache/kafka/test/KStreamTestDriver.java | 2 +-
.../apache/kafka/test/MockProcessorContext.java | 46 +-
15 files changed, 1498 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
deleted file mode 100644
index 2ad1f47..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStoreChangeLogger.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class KeyValueStoreChangeLogger<K, V> {
-
- protected final Serdes<K, V> serialization;
-
- private final Set<K> dirty;
- private final Set<K> removed;
- private final int maxDirty;
- private final int maxRemoved;
-
- private final String topic;
- private int partition;
- private ProcessorContext context;
-
- // always wrap the logged store with the metered store
- public KeyValueStoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
- this.topic = topic;
- this.serialization = serialization;
- this.context = context;
- this.partition = context.id().partition;
-
- this.dirty = new HashSet<>();
- this.removed = new HashSet<>();
- this.maxDirty = 100; // TODO: this needs to be configurable
- this.maxRemoved = 100; // TODO: this needs to be configurable
- }
-
- public void add(K key) {
- this.dirty.add(key);
- this.removed.remove(key);
- }
-
- public void delete(K key) {
- this.dirty.remove(key);
- this.removed.add(key);
- }
-
- public void maybeLogChange(KeyValueStore<K, V> kv) {
- if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
- logChange(kv);
- }
-
- public void logChange(KeyValueStore<K, V> kv) {
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- if (collector != null) {
- Serializer<K> keySerializer = serialization.keySerializer();
- Serializer<V> valueSerializer = serialization.valueSerializer();
-
- for (K k : this.removed) {
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
- }
- for (K k : this.dirty) {
- V v = kv.get(k);
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
- }
- this.removed.clear();
- this.dirty.clear();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 16f57a0..743a110 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -30,6 +30,7 @@ import java.util.List;
public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
+ protected final StoreChangeLogger.ValueGetter getter;
protected final Serdes<K, V> serialization;
protected final String metricScope;
protected final Time time;
@@ -45,11 +46,16 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private StreamingMetrics metrics;
private boolean loggingEnabled = true;
- private KeyValueStoreChangeLogger<K, V> changeLogger = null;
+ private StoreChangeLogger<K, V> changeLogger = null;
// always wrap the store with the metered store
public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
this.inner = inner;
+ this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
+ public V get(K key) {
+ return inner.get(key);
+ }
+ };
this.serialization = serialization;
this.metricScope = metricScope;
this.time = time != null ? time : new SystemTime();
@@ -79,7 +85,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
serialization.init(context);
- this.changeLogger = this.loggingEnabled ? new KeyValueStoreChangeLogger<>(name, context, serialization) : null;
+ this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
@@ -123,7 +129,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
if (loggingEnabled) {
changeLogger.add(key);
- changeLogger.maybeLogChange(this.inner);
+ changeLogger.maybeLogChange(this.getter);
}
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
@@ -141,7 +147,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
K key = entry.key();
changeLogger.add(key);
}
- changeLogger.maybeLogChange(this.inner);
+ changeLogger.maybeLogChange(this.getter);
}
} finally {
this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
@@ -171,7 +177,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected void removed(K key) {
if (loggingEnabled) {
changeLogger.delete(key);
- changeLogger.maybeLogChange(this.inner);
+ changeLogger.maybeLogChange(this.getter);
}
}
@@ -197,7 +203,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.inner.flush();
if (loggingEnabled)
- changeLogger.logChange(this.inner);
+ changeLogger.logChange(this.getter);
} finally {
this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
new file mode 100644
index 0000000..d4ed0e7
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+
+public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
+
+ protected final WindowStore<K, V> inner;
+ protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+ protected final String metricScope;
+ protected final Time time;
+
+ private Sensor putTime;
+ private Sensor getTime;
+ private Sensor rangeTime;
+ private Sensor flushTime;
+ private Sensor restoreTime;
+ private StreamingMetrics metrics;
+
+ private boolean loggingEnabled = true;
+ private StoreChangeLogger<byte[], byte[]> changeLogger = null;
+
+ // always wrap the store with the metered store
+ public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
+ this.inner = inner;
+ this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+ public byte[] get(byte[] key) {
+ return inner.getInternal(key);
+ }
+ };
+ this.metricScope = metricScope;
+ this.time = time != null ? time : new SystemTime();
+ }
+
+ public MeteredWindowStore<K, V> disableLogging() {
+ loggingEnabled = false;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ final String name = name();
+ this.metrics = context.metrics();
+ this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+ this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+ this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+ this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+ this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+ this.changeLogger = this.loggingEnabled ?
+ new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
+
+ // register and possibly restore the state from the logs
+ long startNs = time.nanoseconds();
+ inner.init(context);
+ try {
+ context.register(this, loggingEnabled, new StateRestoreCallback() {
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ inner.putInternal(key, value);
+ }
+ });
+ } finally {
+ this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public WindowStoreIterator<V> fetch(K key, long timestamp) {
+ return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timestamp), this.rangeTime);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ putAndReturnInternalKey(key, value);
+ }
+
+ @Override
+ public byte[] putAndReturnInternalKey(K key, V value) {
+ long startNs = time.nanoseconds();
+ try {
+ byte[] binKey = this.inner.putAndReturnInternalKey(key, value);
+
+ if (loggingEnabled) {
+ changeLogger.add(binKey);
+ changeLogger.maybeLogChange(this.getter);
+ }
+
+ return binKey;
+ } finally {
+ this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+ inner.putInternal(binaryKey, binaryValue);
+ }
+
+ @Override
+ public byte[] getInternal(byte[] binaryKey) {
+ long startNs = time.nanoseconds();
+ try {
+ return this.inner.getInternal(binaryKey);
+ } finally {
+ this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public void flush() {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.flush();
+
+ if (loggingEnabled)
+ changeLogger.logChange(this.getter);
+ } finally {
+ this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+ }
+ }
+
+ private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
+
+ private final WindowStoreIterator<E> iter;
+ private final Sensor sensor;
+ private final long startNs;
+
+ public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public E next() {
+ return iter.next();
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+ }
+ }
+
+ }
+
+ WindowStore<K, V> inner() {
+ return inner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
index 029d72f..a32faf4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
@@ -48,7 +48,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private static final int MAX_WRITE_BUFFERS = 3;
private static final String DB_FILE_DIR = "rocksdb";
- private final String topic;
+ private final String name;
private final Options options;
private final WriteOptions wOptions;
@@ -56,11 +56,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private Serdes<K, V> serdes;
private ProcessorContext context;
- private String dirName;
+ protected File dbDir;
private RocksDB db;
public RocksDBStore(String name, Serdes<K, V> serdes) {
- this.topic = name;
+ this.name = name;
this.serdes = serdes;
// initialize the rocksdb options
@@ -88,8 +88,8 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
serdes.init(context);
this.context = context;
- this.dirName = this.context.stateDir() + File.separator + DB_FILE_DIR;
- this.db = openDB(new File(this.dirName, this.topic), this.options, TTL_SECONDS);
+ this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
+ this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
}
private RocksDB openDB(File dir, Options options, int ttl) {
@@ -98,19 +98,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
dir.getParentFile().mkdirs();
return RocksDB.open(options, dir.toString());
} else {
- throw new KafkaException("Change log is not supported for store " + this.topic + " since it is TTL based.");
+ throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
// TODO: support TTL with change log?
// return TtlDB.open(options, dir.toString(), ttl, false);
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error opening store " + this.topic + " at location " + dir.toString(), e);
+ throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
}
}
@Override
public String name() {
- return this.topic;
+ return this.name;
}
@Override
@@ -124,7 +124,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.topic, e);
+ throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
}
}
@@ -138,7 +138,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.topic, e);
+ throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
}
}
@@ -173,7 +173,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
db.flush(fOptions);
} catch (RocksDBException e) {
// TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing flush from store " + this.topic, e);
+ throw new KafkaException("Error while executing flush from store " + this.name, e);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
new file mode 100644
index 0000000..5189318
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SimpleTimeZone;
+
+public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
+
+ public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
+
+ private static class Segment extends RocksDBStore<byte[], byte[]> {
+ public final long id;
+
+ Segment(String name, long id) {
+ super(name, WindowStoreUtil.INNER_SERDES);
+ this.id = id;
+ }
+
+ public void destroy() {
+ Utils.delete(dbDir);
+ }
+ }
+
+ private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
+ private final Serdes<?, V> serdes;
+ private final KeyValueIterator<byte[], byte[]>[] iterators;
+ private int index = 0;
+
+ RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
+ this(serdes, WindowStoreUtil.NO_ITERATORS);
+ }
+
+ RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
+ this.serdes = serdes;
+ this.iterators = iterators;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (index < iterators.length) {
+ if (iterators[index].hasNext())
+ return true;
+
+ index++;
+ }
+ return false;
+ }
+
+ @Override
+ public V next() {
+ if (index >= iterators.length)
+ throw new NoSuchElementException();
+
+ return serdes.valueFrom(iterators[index].next().value());
+ }
+
+ @Override
+ public void remove() {
+ if (index < iterators.length)
+ iterators[index].remove();
+ }
+
+ public void close() {
+ for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
+ iterator.close();
+ }
+ }
+ }
+
+ private final String name;
+ private final long windowBefore;
+ private final long windowAfter;
+ private final long segmentInterval;
+ private final Segment[] segments;
+ private final Serdes<K, V> serdes;
+ private final SimpleDateFormat formatter;
+
+ private ProcessorContext context;
+ private long currentSegmentId = -1L;
+ private int seqnum = 0;
+
+ public RocksDBWindowStore(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes) {
+ this.name = name;
+ this.windowBefore = windowBefore;
+ this.windowAfter = windowAfter;
+
+ // The retention period must be at least two times as long as the total window size
+ if ((this.windowBefore + this.windowAfter + 1) * 2 > retentionPeriod)
+ retentionPeriod = (this.windowBefore + this.windowAfter + 1) * 2;
+
+ // The segment interval must be greater than MIN_SEGMENT_INTERVAL
+ this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+
+ this.segments = new Segment[numSegments];
+ this.serdes = serdes;
+
+ // Create a date formatter. Formatted timestamps are used as segment name suffixes
+ this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
+ this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public void flush() {
+ for (Segment segment : segments) {
+ if (segment != null)
+ segment.flush();
+ }
+ }
+
+ @Override
+ public void close() {
+ for (Segment segment : segments) {
+ if (segment != null)
+ segment.close();
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ putAndReturnInternalKey(key, value);
+ }
+
+ @Override
+ public byte[] putAndReturnInternalKey(K key, V value) {
+ long timestamp = context.timestamp();
+ long segmentId = segmentId(timestamp);
+
+ if (segmentId > currentSegmentId) {
+ // A new segment will be created. Clean up old segments first.
+ currentSegmentId = segmentId;
+ cleanup();
+ }
+
+ // If the record is within the retention period, put it in the store.
+ if (segmentId > currentSegmentId - segments.length) {
+ seqnum = (seqnum + 1) & 0x7FFFFFFF;
+ byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
+ getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
+ return binaryKey;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+ long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+ if (segmentId > currentSegmentId) {
+ // A new segment will be created. Clean up old segments first.
+ currentSegmentId = segmentId;
+ cleanup();
+ }
+
+ // If the record is within the retention period, put it in the store.
+ if (segmentId > currentSegmentId - segments.length)
+ getSegment(segmentId).put(binaryKey, binaryValue);
+ }
+
+ @Override
+ public byte[] getInternal(byte[] binaryKey) {
+ long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+ Segment segment = segments[(int) (segmentId % segments.length)];
+
+ if (segment != null && segment.id == segmentId) {
+ return segment.get(binaryKey);
+ } else {
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public WindowStoreIterator<V> fetch(K key, long timestamp) {
+ long timeFrom = Math.max(0L, timestamp - windowBefore);
+ long timeTo = Math.max(0L, timestamp + windowAfter);
+
+ long segFrom = segmentId(timeFrom);
+ long segTo = segmentId(Math.max(0L, timeTo));
+
+ byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
+ byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
+
+ ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
+
+ for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
+ Segment segment = segments[(int) (segmentId % segments.length)];
+
+ if (segment != null && segment.id == segmentId)
+ iterators.add(segment.range(binaryFrom, binaryUntil));
+ }
+
+ if (iterators.size() > 0) {
+ return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
+ } else {
+ return new RocksDBWindowStoreIterator<>(serdes);
+ }
+ }
+
+ private Segment getSegment(long segmentId) {
+ int index = (int) (segmentId % segments.length);
+
+ if (segments[index] == null) {
+ segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
+ segments[index].init(context);
+ }
+
+ return segments[index];
+ }
+
+ private void cleanup() {
+ for (int i = 0; i < segments.length; i++) {
+ if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
+ segments[i].close();
+ segments[i].destroy();
+ segments[i] = null;
+ }
+ }
+ }
+
+ public long segmentId(long timestamp) {
+ return timestamp / segmentInterval;
+ }
+
+ public String directorySuffix(long segmentId) {
+ return formatter.format(new Date(segmentId * segmentInterval));
+ }
+
+ // this method is used by a test
+ public Set<Long> segmentIds() {
+ HashSet<Long> segmentIds = new HashSet<>();
+
+ for (Segment segment : segments) {
+ if (segment != null)
+ segmentIds.add(segment.id);
+ }
+
+ return segmentIds;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
new file mode 100644
index 0000000..41c725d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+
+/**
+ * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see Stores#create(String)
+ */
+public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final long windowBefore;
+ private final long windowAfter;
+ private final long retentionPeriod;
+ private final int numSegments;
+ private final Serdes serdes;
+ private final Time time;
+
+ protected RocksDBWindowStoreSupplier(String name, long windowBefore, long windowAfter, long retentionPeriod, int numSegments, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.windowBefore = windowBefore;
+ this.windowAfter = windowAfter;
+ this.retentionPeriod = retentionPeriod;
+ this.numSegments = numSegments;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, windowBefore, windowAfter, retentionPeriod, numSegments, serdes), "rocksdb-window", time);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
new file mode 100644
index 0000000..ee6624e
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StoreChangeLogger<K, V> {
+
+ public interface ValueGetter<K, V> {
+ V get(K key);
+ }
+
+ protected final Serdes<K, V> serialization;
+
+ private final Set<K> dirty;
+ private final Set<K> removed;
+ private final int maxDirty;
+ private final int maxRemoved;
+
+ private final String topic;
+ private int partition;
+ private ProcessorContext context;
+
+ // always wrap the logged store with the metered store
+ public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+ this.topic = topic;
+ this.serialization = serialization;
+ this.context = context;
+ this.partition = context.id().partition;
+
+ this.dirty = new HashSet<>();
+ this.removed = new HashSet<>();
+ this.maxDirty = 100; // TODO: this needs to be configurable
+ this.maxRemoved = 100; // TODO: this needs to be configurable
+ }
+
+ public void add(K key) {
+ this.dirty.add(key);
+ this.removed.remove(key);
+ }
+
+ public void delete(K key) {
+ this.dirty.remove(key);
+ this.removed.add(key);
+ }
+
+ public void maybeLogChange(ValueGetter<K, V> getter) {
+ if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+ logChange(getter);
+ }
+
+ public void logChange(ValueGetter<K, V> getter) {
+ RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+ if (collector != null) {
+ Serializer<K> keySerializer = serialization.keySerializer();
+ Serializer<V> valueSerializer = serialization.valueSerializer();
+
+ for (K k : this.removed) {
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+ }
+ for (K k : this.dirty) {
+ V v = getter.get(k);
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+ }
+ this.removed.clear();
+ this.dirty.clear();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
new file mode 100644
index 0000000..344aa91
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.processor.StateStore;
+
+public interface WindowStore<K, V> extends StateStore {
+
+ void put(K key, V value);
+
+ byte[] putAndReturnInternalKey(K key, V value);
+
+ WindowStoreIterator<V> fetch(K key, long timestamp);
+
+ void putInternal(byte[] binaryKey, byte[] binaryValue);
+
+ byte[] getInternal(byte[] binaryKey);
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
new file mode 100644
index 0000000..e57a00f
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import java.util.Iterator;
+
+public interface WindowStoreIterator<E> extends Iterator<E> {
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
new file mode 100644
index 0000000..b11a206
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import java.nio.ByteBuffer;
+
+public class WindowStoreUtil<K, V> {
+
+ public static final int TIMESTAMP_SIZE = 8;
+ public static final int SEQNUM_SIZE = 4;
+ public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class);
+ @SuppressWarnings("unchecked")
+ public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
+
+ public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) {
+ byte[] serializedKey = serdes.rawKey(key);
+
+ ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
+ buf.put(serializedKey);
+ buf.putLong(timestamp);
+ buf.putInt(seqnum);
+
+ return buf.array();
+ }
+
+ public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) {
+ byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
+
+ System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+
+ return serdes.keyFrom(bytes);
+ }
+
+ public static long timestampFromBinaryKey(byte[] binaryKey) {
+ return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 9e24741..6f74da8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -221,7 +221,6 @@ public class KeyValueStoreTestDriver<K, V> {
private final Map<K, V> flushedEntries = new HashMap<>();
private final Set<K> flushedRemovals = new HashSet<>();
private final List<Entry<K, V>> restorableEntries = new LinkedList<>();
- private final StreamingConfig config;
private final MockProcessorContext context;
private final Map<String, StateStore> storeMap = new HashMap<>();
private final StreamingMetrics metrics = new StreamingMetrics() {
@@ -235,7 +234,7 @@ public class KeyValueStoreTestDriver<K, V> {
}
};
private final RecordCollector recordCollector;
- private File stateDir = new File("build/data").getAbsoluteFile();
+ private File stateDir = null;
protected KeyValueStoreTestDriver(Serdes<K, V> serdes) {
this.serdes = serdes;
@@ -247,6 +246,9 @@ public class KeyValueStoreTestDriver<K, V> {
recordFlushed(record.key(), record.value());
}
};
+ this.stateDir = StateUtils.tempDir();
+ this.stateDir.mkdirs();
+
Properties props = new Properties();
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
@@ -254,9 +256,8 @@ public class KeyValueStoreTestDriver<K, V> {
props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass());
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass());
props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass());
- this.config = new StreamingConfig(props);
- this.context = new MockProcessorContext(null, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
+ this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(),
serdes.valueDeserializer(), recordCollector) {
@Override
public TaskId id() {
@@ -286,24 +287,11 @@ public class KeyValueStoreTestDriver<K, V> {
@Override
public File stateDir() {
- if (stateDir == null) {
- stateDir = StateUtils.tempDir();
- }
- stateDir.mkdirs();
return stateDir;
}
};
}
- /**
- * Set the directory that should be used by the store for local disk storage.
- *
- * @param dir the directory; may be null if no local storage is allowed
- */
- public void useStateDir(File dir) {
- this.stateDir = dir;
- }
-
@SuppressWarnings("unchecked")
protected <K1, V1> void recordFlushed(K1 key, V1 value) {
K k = (K) key;
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
new file mode 100644
index 0000000..6bfddfe
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
@@ -0,0 +1,672 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RocksDBWindowStoreTest {
+
+ private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
+ private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
+ private final int numSegments = 3;
+ private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
+ private final long retentionPeriod = segmentSize * (numSegments - 1);
+ private final long windowSize = 3;
+ private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
+
+
+ protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, long windowBefore, long windowAfter, Serdes<K, V> serdes) {
+ StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", windowBefore, windowAfter, retentionPeriod, numSegments, serdes, null);
+ WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
+ store.init(context);
+ return store;
+
+ }
+
+ @Test
+ public void testPutAndFetch() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime + 0L);
+ store.put(0, "zero");
+ context.setTime(startTime + 1L);
+ store.put(1, "one");
+ context.setTime(startTime + 2L);
+ store.put(2, "two");
+ context.setTime(startTime + 3L);
+ // (3, "three") is not put
+ context.setTime(startTime + 4L);
+ store.put(4, "four");
+ context.setTime(startTime + 5L);
+ store.put(5, "five");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+
+ context.setTime(startTime + 3L);
+ store.put(2, "two+1");
+ context.setTime(startTime + 4L);
+ store.put(2, "two+2");
+ context.setTime(startTime + 5L);
+ store.put(2, "two+3");
+ context.setTime(startTime + 6L);
+ store.put(2, "two+4");
+ context.setTime(startTime + 7L);
+ store.put(2, "two+5");
+ context.setTime(startTime + 8L);
+ store.put(2, "two+6");
+
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
+ assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testPutAndFetchBefore() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, windowSize, 0, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime + 0L);
+ store.put(0, "zero");
+ context.setTime(startTime + 1L);
+ store.put(1, "one");
+ context.setTime(startTime + 2L);
+ store.put(2, "two");
+ context.setTime(startTime + 3L);
+ // (3, "three") is not put
+ context.setTime(startTime + 4L);
+ store.put(4, "four");
+ context.setTime(startTime + 5L);
+ store.put(5, "five");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+
+ context.setTime(startTime + 3L);
+ store.put(2, "two+1");
+ context.setTime(startTime + 4L);
+ store.put(2, "two+2");
+ context.setTime(startTime + 5L);
+ store.put(2, "two+3");
+ context.setTime(startTime + 6L);
+ store.put(2, "two+4");
+ context.setTime(startTime + 7L);
+ store.put(2, "two+5");
+ context.setTime(startTime + 8L);
+ store.put(2, "two+6");
+
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L)));
+ assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testPutAndFetchAfter() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, 0, windowSize, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime + 0L);
+ store.put(0, "zero");
+ context.setTime(startTime + 1L);
+ store.put(1, "one");
+ context.setTime(startTime + 2L);
+ store.put(2, "two");
+ context.setTime(startTime + 3L);
+ // (3, "three") is not put
+ context.setTime(startTime + 4L);
+ store.put(4, "four");
+ context.setTime(startTime + 5L);
+ store.put(5, "five");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L)));
+
+ context.setTime(startTime + 3L);
+ store.put(2, "two+1");
+ context.setTime(startTime + 4L);
+ store.put(2, "two+2");
+ context.setTime(startTime + 5L);
+ store.put(2, "two+3");
+ context.setTime(startTime + 6L);
+ store.put(2, "two+4");
+ context.setTime(startTime + 7L);
+ store.put(2, "two+5");
+ context.setTime(startTime + 8L);
+ store.put(2, "two+6");
+
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L)));
+ assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testPutSameKeyTimestamp() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime);
+ store.put(0, "zero");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
+
+ context.setTime(startTime);
+ store.put(0, "zero");
+ context.setTime(startTime);
+ store.put(0, "zero+");
+ context.setTime(startTime);
+ store.put(0, "zero++");
+
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testRolling() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+ RocksDBWindowStore<Integer, String> inner =
+ (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+ try {
+ long startTime = segmentSize * 2;
+ long incr = segmentSize / 2;
+
+ context.setTime(startTime);
+ store.put(0, "zero");
+ assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+ context.setTime(startTime + incr);
+ store.put(1, "one");
+ assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 2);
+ store.put(2, "two");
+ assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 3);
+ // (3, "three") is not put
+ assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 4);
+ store.put(4, "four");
+ assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 5);
+ store.put(5, "five");
+ assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+
+ context.setTime(startTime + incr * 6);
+ store.put(6, "six");
+ assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+
+ context.setTime(startTime + incr * 7);
+ store.put(7, "seven");
+ assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+ assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+
+ context.setTime(startTime + incr * 8);
+ store.put(8, "eight");
+ assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+ assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+ assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+
+ // check segment directories
+ store.flush();
+ assertEquals(
+ Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+ segmentDirs(baseDir)
+ );
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testRestore() throws IOException {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ long startTime = segmentSize * 2;
+ long incr = segmentSize / 2;
+
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+ try {
+ context.setTime(startTime);
+ store.put(0, "zero");
+ context.setTime(startTime + incr);
+ store.put(1, "one");
+ context.setTime(startTime + incr * 2);
+ store.put(2, "two");
+ context.setTime(startTime + incr * 3);
+ store.put(3, "three");
+ context.setTime(startTime + incr * 4);
+ store.put(4, "four");
+ context.setTime(startTime + incr * 5);
+ store.put(5, "five");
+ context.setTime(startTime + incr * 6);
+ store.put(6, "six");
+ context.setTime(startTime + incr * 7);
+ store.put(7, "seven");
+ context.setTime(startTime + incr * 8);
+ store.put(8, "eight");
+ store.flush();
+
+ } finally {
+ store.close();
+ }
+
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+
+ File baseDir2 = Files.createTempDirectory("test").toFile();
+ try {
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, windowSize, windowSize, serdes);
+ RocksDBWindowStore<Integer, String> inner =
+ (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+ try {
+ context.restore("window", changeLog);
+
+ assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6)));
+ assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7)));
+ assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8)));
+
+ // check segment directories
+ store.flush();
+ assertEquals(
+ Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+ segmentDirs(baseDir)
+ );
+ } finally {
+ store.close();
+ }
+
+
+ } finally {
+ Utils.delete(baseDir2);
+ }
+ }
+
+ private <E> List<E> toList(WindowStoreIterator<E> iterator) {
+ ArrayList<E> list = new ArrayList<>();
+ while (iterator.hasNext()) {
+ list.add(iterator.next());
+ }
+ return list;
+ }
+
+ private Set<String> segmentDirs(File baseDir) {
+ File rocksDbDir = new File(baseDir, "rocksdb");
+ String[] subdirs = rocksDbDir.list();
+
+ HashSet<String> set = new HashSet<>();
+
+ for (String subdir : subdirs) {
+ if (subdir.startsWith("window-"))
+ set.add(subdir.substring(7));
+ }
+ return set;
+ }
+
+ private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
+ HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
+
+ for (Entry<byte[], byte[]> entry : changeLog) {
+ long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
+ Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
+ String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
+
+ Set<String> entries = entriesByKey.get(key);
+ if (entries == null) {
+ entries = new HashSet<>();
+ entriesByKey.put(key, entries);
+ }
+ entries.add(value + "@" + (timestamp - startTime));
+ }
+
+ return entriesByKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
index c7ea748..f342dcd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/StateUtils.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams.state;
-import org.apache.kafka.test.TestUtils;
-
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
@@ -39,17 +37,21 @@ public class StateUtils {
* @return the new directory that will exist; never null
*/
public static File tempDir() {
- final File dir = new File(TestUtils.IO_TMP_DIR, "kafka-" + INSTANCE_COUNTER.incrementAndGet());
- dir.mkdirs();
- dir.deleteOnExit();
+ try {
+ final File dir = Files.createTempDirectory("test").toFile();
+ dir.mkdirs();
+ dir.deleteOnExit();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- deleteDirectory(dir);
- }
- });
- return dir;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ deleteDirectory(dir);
+ }
+ });
+ return dir;
+ } catch (IOException ex) {
+ throw new RuntimeException("failed to create a temp dir", ex);
+ }
}
private static void deleteDirectory(File dir) {
@@ -74,4 +76,4 @@ public class StateUtils {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b0b3e5ae/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index a6c2759..5275545 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -53,8 +53,8 @@ public class KStreamTestDriver {
Serializer<?> keySerializer, Deserializer<?> keyDeserializer,
Serializer<?> valSerializer, Deserializer<?> valDeserializer) {
this.topology = builder.build(null);
- this.context = new MockProcessorContext(this, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
this.stateDir = stateDir;
+ this.context = new MockProcessorContext(this, stateDir, keySerializer, keyDeserializer, valSerializer, valDeserializer, new MockRecordCollector());
for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
StateStore store = stateStoreSupplier.get();