You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/04/18 18:19:46 UTC
[kafka] branch trunk updated: KAFKA-10847: Delete Time-ordered
duplicated records using deleteRange() internally (#10537)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 15c24da KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally (#10537)
15c24da is described below
commit 15c24da888bc0a2582fd84d89fa1b84354fede64
Author: Sergio Peña <se...@confluent.io>
AuthorDate: Sun Apr 18 13:18:09 2021 -0500
KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally (#10537)
This PR changes the TimeOrderedKeySchema composite key from time-seq-key -> time-key-seq to allow deletion of duplicated time-key records using the RocksDB deleteRange API. It also removes all duplicates when put(key, null) is called. Currently, the put(key, null) was a no-op, which was causing problems because there was no way to delete any keys when duplicates are allowed.
The RocksDB deleteRange(keyFrom, keyTo) deletes a range of keys from keyFrom (inclusive) to keyTo (exclusive). To make keyTo inclusive, I incremented the end key by one when calling the RocksDBAccessor.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../AbstractRocksDBSegmentedBytesStore.java | 9 +
.../streams/state/internals/KeyValueSegment.java | 6 +
.../streams/state/internals/RocksDBStore.java | 26 +++
.../internals/RocksDBTimeOrderedWindowStore.java | 29 ++-
.../state/internals/RocksDBTimestampedStore.java | 16 ++
.../kafka/streams/state/internals/Segment.java | 1 +
.../state/internals/SegmentedBytesStore.java | 20 ++
.../state/internals/TimeOrderedKeySchema.java | 106 +++++------
.../state/internals/TimestampedSegment.java | 6 +
.../AbstractRocksDBSegmentedBytesStoreTest.java | 2 +-
.../RocksDBTimeOrderedWindowStoreTest.java | 205 +++++++++++++++++++--
.../state/internals/TimeOrderedKeySchemaTest.java | 93 ----------
12 files changed, 325 insertions(+), 194 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 4c545a5..51fb3ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -197,6 +197,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
}
@Override
+ public void remove(final Bytes key, final long timestamp) {
+ final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp);
+ final S segment = segments.getSegmentForTimestamp(timestamp);
+ if (segment != null) {
+ segment.deleteRange(keyBytes, keyBytes);
+ }
+ }
+
+ @Override
public void put(final Bytes key,
final byte[] value) {
final long timestamp = keySchema.segmentTimestamp(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index b6d6504..66c55fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -41,6 +42,11 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment
}
@Override
+ public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+ super.deleteRange(keyFrom, keyTo);
+ }
+
+ @Override
public int compareTo(final KeyValueSegment segment) {
return Long.compare(id, segment.id);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 12ba4eb..c03f86d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -349,6 +349,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
return oldValue;
}
+ void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+ Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
+ Objects.requireNonNull(keyTo, "keyTo cannot be null");
+
+ validateStoreOpen();
+
+ // End of key is exclusive, so we increment it by 1 byte to make keyTo inclusive
+ dbAccessor.deleteRange(keyFrom.get(), Bytes.increment(keyTo).get());
+ }
+
@Override
public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
@@ -524,6 +534,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
final Bytes to,
final boolean forward);
+ /**
+ * Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'.
+ */
+ void deleteRange(final byte[] from,
+ final byte[] to);
+
KeyValueIterator<Bytes, byte[]> all(final boolean forward);
KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
@@ -604,6 +620,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
+ public void deleteRange(final byte[] from, final byte[] to) {
+ try {
+ db.deleteRange(columnFamily, wOptions, from, to);
+ } catch (final RocksDBException e) {
+ // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+ throw new ProcessorStateException("Error while removing key from store " + name, e);
+ }
+ }
+
+ @Override
public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily);
if (forward) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
index 3efeb32..f8ba883 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
@@ -52,29 +52,29 @@ public class RocksDBTimeOrderedWindowStore
@Override
public void put(final Bytes key, final byte[] value, final long timestamp) {
- // Skip if value is null and duplicates are allowed since this delete is a no-op
if (!(value == null && retainDuplicates)) {
maybeUpdateSeqnumForDups();
wrapped().put(TimeOrderedKeySchema.toStoreKeyBinary(key, timestamp, seqnum), value);
+ } else {
+ // Delete all duplicates for the specified key and timestamp
+ wrapped().remove(key, timestamp);
}
}
@Override
public byte[] fetch(final Bytes key, final long timestamp) {
- return wrapped().get(TimeOrderedKeySchema.toStoreKeyBinary(key, timestamp, seqnum));
+ throw new UnsupportedOperationException();
}
- @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+ @SuppressWarnings("deprecation")
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(key, timeFrom, timeTo);
- return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator();
+ throw new UnsupportedOperationException();
}
@Override
public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final long timeFrom, final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch(key, timeFrom, timeTo);
- return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator();
+ throw new UnsupportedOperationException();
}
@SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@@ -83,8 +83,7 @@ public class RocksDBTimeOrderedWindowStore
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
- return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+ throw new UnsupportedOperationException();
}
@Override
@@ -92,8 +91,7 @@ public class RocksDBTimeOrderedWindowStore
final Bytes keyTo,
final long timeFrom,
final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
- return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+ throw new UnsupportedOperationException();
}
@Override
@@ -104,21 +102,18 @@ public class RocksDBTimeOrderedWindowStore
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
- final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardAll();
- return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+ throw new UnsupportedOperationException();
}
@SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo);
- return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+ throw new UnsupportedOperationException();
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom, final long timeTo) {
- final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetchAll(timeFrom, timeTo);
- return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+ throw new UnsupportedOperationException();
}
private void maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index db3d175..dafbc8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -206,6 +206,22 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
}
@Override
+ public void deleteRange(final byte[] from, final byte[] to) {
+ try {
+ db.deleteRange(oldColumnFamily, wOptions, from, to);
+ } catch (final RocksDBException e) {
+ // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+ throw new ProcessorStateException("Error while removing key from store " + name, e);
+ }
+ try {
+ db.deleteRange(newColumnFamily, wOptions, from, to);
+ } catch (final RocksDBException e) {
+ // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+ throw new ProcessorStateException("Error while removing key from store " + name, e);
+ }
+ }
+
+ @Override
public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily);
final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 9fddc16..ea3b89a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -25,4 +25,5 @@ public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore
void destroy() throws IOException;
+ void deleteRange(Bytes keyFrom, Bytes keyTo);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 79ada1f..4519929 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -110,6 +110,14 @@ public interface SegmentedBytesStore extends StateStore {
void remove(Bytes key);
/**
+ * Remove all duplicated records with the provided key in the specified timestamp.
+ *
+ * @param key the segmented key to remove
+ * @param timestamp the timestamp to match
+ */
+ void remove(Bytes key, long timestamp);
+
+ /**
* Write a new value to the store with the provided key. The key
* should be a composite of the record key, and the timestamp information etc
* as described by the {@link KeySchema}
@@ -152,6 +160,18 @@ public interface SegmentedBytesStore extends StateStore {
Bytes lowerRange(final Bytes key, final long from);
/**
+ * Given a record key and a time, construct a Segmented key to search when performing
+ * prefixed queries.
+ *
+ * @param key
+ * @param timestamp
+ * @return The key that represents the prefixed Segmented key in bytes.
+ */
+ default Bytes toStoreBinaryKeyPrefix(final Bytes key, long timestamp) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Given a range of fixed size record keys and a time, construct a Segmented key that represents
* the upper range of keys to search when performing range queries.
* @see SessionKeySchema#upperRange
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
index b834b3a..f191d2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
@@ -30,62 +30,40 @@ import java.util.List;
/**
* A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
- * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
- * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ * key into a schema combined of (time,key,seq). Since key is variable length while time/seq is
+ * fixed length, when formatting in this order, varying time range query would be very inefficient
+ * since we'd need to be very conservative in picking the from / to boundaries; however for now
+ * we do not expect any varying time range access at all, only fixed time range only.
*/
public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
private static final Logger LOG = LoggerFactory.getLogger(TimeOrderedKeySchema.class);
private static final int TIMESTAMP_SIZE = 8;
private static final int SEQNUM_SIZE = 4;
- private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
- /**
- * {@inheritdoc}
- *
- * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
- * range queries may be slower. If better performance on key range queries are necessary, then
- * use the {@link WindowKeySchema}.
- */
@Override
public Bytes upperRange(final Bytes key, final long to) {
- return toStoreKeyBinary(key.get(), to, Integer.MAX_VALUE);
+ throw new UnsupportedOperationException();
}
- /**
- * {@inheritdoc}
- *
- * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
- * range queries may be slower. If better performance on key range queries are necessary, then
- * use the {@link WindowKeySchema}.
- */
@Override
public Bytes lowerRange(final Bytes key, final long from) {
- return toStoreKeyBinary(key.get(), from, 0);
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Bytes toStoreBinaryKeyPrefix(final Bytes key, final long timestamp) {
+ return toStoreKeyBinaryPrefix(key, timestamp);
}
- /**
- * {@inheritdoc}
- *
- * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
- * range queries may be slower. If better performance on key range queries are necessary, then
- * use the {@link WindowKeySchema}.
- */
@Override
public Bytes upperRangeFixedSize(final Bytes key, final long to) {
- return toStoreKeyBinary(key, to, Integer.MAX_VALUE);
+ throw new UnsupportedOperationException();
}
- /**
- * {@inheritdoc}
- *
- * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
- * range queries may be slower. If better performance on key range queries are necessary, then
- * use the {@link WindowKeySchema}.
- */
@Override
public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
- return toStoreKeyBinary(key, Math.max(0, from), 0);
+ throw new UnsupportedOperationException();
}
@Override
@@ -96,31 +74,36 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem
/**
* {@inheritdoc}
*
- * This method is not optimized for {@link TimeOrderedKeySchema}. The method may do unnecessary
- * checks to find the next record.
+ * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time
+ * range queries are not supported.
*/
@Override
public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
- return iterator -> {
- while (iterator.hasNext()) {
- final Bytes bytes = iterator.peekNextKey();
- final Bytes keyBytes = Bytes.wrap(extractStoreKeyBytes(bytes.get()));
- final long time = extractStoreTimestamp(bytes.get());
- if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
- && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
- && time >= from
- && time <= to) {
- return true;
- }
- iterator.next();
- }
- return false;
- };
+ if (binaryKeyFrom != null || binaryKeyTo != null) {
+ throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported.");
+ }
+
+ if (from != 0 && to != Long.MAX_VALUE) {
+ throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+ }
+
+ return iterator -> iterator.hasNext();
}
@Override
public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, final long from, final long to, final boolean forward) {
- return segments.segments(from, to, forward);
+ throw new UnsupportedOperationException();
+ }
+
+ public static Bytes toStoreKeyBinaryPrefix(final Bytes key,
+ final long timestamp) {
+ final byte[] serializedKey = key.get();
+
+ final ByteBuffer buf = ByteBuffer.allocate(TIMESTAMP_SIZE + serializedKey.length);
+ buf.putLong(timestamp);
+ buf.put(serializedKey);
+
+ return Bytes.wrap(buf.array());
}
public static Bytes toStoreKeyBinary(final Bytes key,
@@ -149,30 +132,23 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem
final int seqnum) {
final ByteBuffer buf = ByteBuffer.allocate(TIMESTAMP_SIZE + serializedKey.length + SEQNUM_SIZE);
buf.putLong(timestamp);
- buf.putInt(seqnum);
buf.put(serializedKey);
+ buf.putInt(seqnum);
return Bytes.wrap(buf.array());
}
static byte[] extractStoreKeyBytes(final byte[] binaryKey) {
final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
- System.arraycopy(binaryKey, PREFIX_SIZE, bytes, 0, bytes.length);
+ System.arraycopy(binaryKey, TIMESTAMP_SIZE, bytes, 0, bytes.length);
return bytes;
}
- static <K> K extractStoreKey(final byte[] binaryKey,
- final StateSerdes<K, ?> serdes) {
- final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
- System.arraycopy(binaryKey, PREFIX_SIZE, bytes, 0, bytes.length);
- return serdes.keyFrom(bytes);
- }
-
static long extractStoreTimestamp(final byte[] binaryKey) {
return ByteBuffer.wrap(binaryKey).getLong(0);
}
static int extractStoreSequence(final byte[] binaryKey) {
- return ByteBuffer.wrap(binaryKey).getInt(TIMESTAMP_SIZE);
+ return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
}
static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
@@ -202,8 +178,8 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem
* Safely construct a time window of the given size,
* taking care of bounding endMs to Long.MAX_VALUE if necessary
*/
- static TimeWindow timeWindowForSize(final long startMs,
- final long windowSize) {
+ private static TimeWindow timeWindowForSize(final long startMs,
+ final long windowSize) {
long endMs = startMs + windowSize;
if (endMs < 0) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 36fe0e5..f0e4cf6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
@@ -41,6 +42,11 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<T
}
@Override
+ public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public int compareTo(final TimestampedSegment segment) {
return Long.compare(id, segment.id);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index a1344f6..53d1846 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -93,7 +93,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
@Parameters(name = "{0}")
public static Object[] getKeySchemas() {
- return new Object[] {new SessionKeySchema(), new WindowKeySchema(), new TimeOrderedKeySchema()};
+ return new Object[] {new SessionKeySchema(), new WindowKeySchema()};
}
@Before
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
index 37f98f1..2646c4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
@@ -16,18 +16,68 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.File;
+
+import static java.util.Arrays.asList;
import static org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.junit.Assert.assertEquals;
+
+public class RocksDBTimeOrderedWindowStoreTest {
+ private static final long WINDOW_SIZE = 3L;
+ private static final long SEGMENT_INTERVAL = 60_000L;
+ private static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL;
+
+ private static final String STORE_NAME = "rocksDB time-ordered window store";
+
+ WindowStore<Integer, String> windowStore;
+ InternalMockProcessorContext context;
+ MockRecordCollector recordCollector;
+
+ final File baseDir = TestUtils.tempDirectory("test");
-public class RocksDBTimeOrderedWindowStoreTest extends RocksDBWindowStoreTest {
- private static final String STORE_NAME = "rocksDB window store";
+ @Before
+ public void setup() {
+ windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
+
+ recordCollector = new MockRecordCollector();
+ context = new InternalMockProcessorContext(
+ baseDir,
+ Serdes.String(),
+ Serdes.Integer(),
+ recordCollector,
+ new ThreadCache(
+ new LogContext("testCache"),
+ 0,
+ new MockStreamsMetrics(new Metrics())));
+ context.setTime(1L);
+
+ windowStore.init((StateStoreContext) context, windowStore);
+ }
+
+ @After
+ public void after() {
+ windowStore.close();
+ }
- @Override
<K, V> WindowStore<K, V> buildWindowStore(final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
@@ -47,24 +97,143 @@ public class RocksDBTimeOrderedWindowStoreTest extends RocksDBWindowStoreTest {
.build();
}
- @Override
- String getMetricsScope() {
- return new RocksDbWindowBytesStoreSupplier(null, 0, 0, 0, false, WindowStoreTypes.TIME_ORDERED_WINDOW_STORE).metricsScope();
+ @Test
+ public void shouldGetAll() {
+ final long startTime = SEGMENT_INTERVAL - 4L;
+
+ windowStore.put(0, "zero", startTime + 0);
+ windowStore.put(1, "one", startTime + 1);
+ windowStore.put(2, "two", startTime + 2);
+
+ final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+ final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+
+ assertEquals(
+ asList(zero, one, two),
+ toList(windowStore.all())
+ );
+ }
+
+ @Test
+ public void shouldGetAllDuplicates() {
+ final long startTime = SEGMENT_INTERVAL - 4L;
+
+ windowStore.put(0, "zero1", startTime + 0);
+ windowStore.put(0, "zero2", startTime + 0);
+ windowStore.put(0, "zero3", startTime + 0);
+
+ final KeyValue<Windowed<Integer>, String> zero1 = windowedPair(0, "zero1", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> zero2 = windowedPair(0, "zero2", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> zero3 = windowedPair(0, "zero3", startTime + 0);
+
+ assertEquals(
+ asList(zero1, zero2, zero3),
+ toList(windowStore.all())
+ );
+ }
+
+ @Test
+ public void shouldGetAllNonDeletedRecords() {
+ final long startTime = SEGMENT_INTERVAL - 4L;
+
+ // Add some records
+ windowStore.put(0, "zero", startTime + 0);
+ windowStore.put(1, "one", startTime + 1);
+ windowStore.put(2, "two", startTime + 2);
+ windowStore.put(3, "three", startTime + 3);
+ windowStore.put(4, "four", startTime + 4);
+
+ // Delete some records
+ windowStore.put(1, null, startTime + 1);
+ windowStore.put(3, null, startTime + 3);
+
+ // Only non-deleted records should appear in the all() iterator
+ final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+ final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+
+ assertEquals(
+ asList(zero, two, four),
+ toList(windowStore.all())
+ );
+ }
+
+ @Test
+ public void shouldDeleteAllDuplicates() {
+ final long startTime = SEGMENT_INTERVAL - 4L;
+
+ windowStore.put(0, "zero1", startTime + 0);
+ windowStore.put(0, "zero2", startTime + 0);
+ windowStore.put(0, "zero3", startTime + 0);
+ windowStore.put(1, "one1", startTime + 1);
+ windowStore.put(1, "one2", startTime + 1);
+
+ windowStore.put(0, null, startTime + 0);
+
+ final KeyValue<Windowed<Integer>, String> one1 = windowedPair(1, "one1", startTime + 1);
+ final KeyValue<Windowed<Integer>, String> one2 = windowedPair(1, "one2", startTime + 1);
+
+ assertEquals(
+ asList(one1, one2),
+ toList(windowStore.all())
+ );
}
- @Override
- void setClassLoggerToDebug() {
- LogCaptureAppender.setClassLoggerToDebug(AbstractRocksDBSegmentedBytesStore.class);
+ @Test
+ public void shouldGetAllReturnTimestampOrderedRecords() {
+ final long startTime = SEGMENT_INTERVAL - 4L;
+
+ // Add some records in different order
+ windowStore.put(4, "four", startTime + 4);
+ windowStore.put(0, "zero", startTime + 0);
+ windowStore.put(2, "two1", startTime + 2);
+ windowStore.put(3, "three", startTime + 3);
+ windowStore.put(1, "one", startTime + 1);
+
+ // Add duplicates
+ windowStore.put(2, "two2", startTime + 2);
+
+ // Only non-deleted records should appear in the all() iterator
+ final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+ final KeyValue<Windowed<Integer>, String> two1 = windowedPair(2, "two1", startTime + 2);
+ final KeyValue<Windowed<Integer>, String> two2 = windowedPair(2, "two2", startTime + 2);
+ final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", startTime + 3);
+ final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+
+ assertEquals(
+ asList(zero, one, two1, two2, three, four),
+ toList(windowStore.all())
+ );
+ }
+
+ @Test
+ public void shouldEarlyClosedIteratorStillGetAllRecords() {
+ final long startTime = SEGMENT_INTERVAL - 4L;
+
+ windowStore.put(0, "zero", startTime + 0);
+ windowStore.put(1, "one", startTime + 1);
+
+ final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+ final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+
+ final KeyValueIterator<Windowed<Integer>, String> it = windowStore.all();
+ assertEquals(zero, it.next());
+ it.close();
+
+ // A new all() iterator after a previous all() iterator was closed should return all elements.
+ assertEquals(
+ asList(zero, one),
+ toList(windowStore.all())
+ );
}
- @Override
- long extractStoreTimestamp(final byte[] binaryKey) {
- return TimeOrderedKeySchema.extractStoreTimestamp(binaryKey);
+ private static <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp) {
+ return windowedPair(key, value, timestamp, WINDOW_SIZE);
}
- @Override
- <K> K extractStoreKey(final byte[] binaryKey,
- final StateSerdes<K, ?> serdes) {
- return TimeOrderedKeySchema.extractStoreKey(binaryKey, serdes);
+ private static <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp, final long windowSize) {
+ return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value);
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java
index 5a5c4fc..03f81e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java
@@ -19,20 +19,12 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.KeyValueIteratorStub;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -44,94 +36,9 @@ public class TimeOrderedKeySchemaTest {
final private Window window = new TimeWindow(startTime, endTime);
final private Windowed<String> windowedKey = new Windowed<>(key, window);
- final private TimeOrderedKeySchema timeOrderedKeySchema = new TimeOrderedKeySchema();
final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray());
@Test
- public void testHasNextConditionUsingNullKeys() {
- final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(
- KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(0, 1)), 0), 1),
- KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(0, 1)), 0), 2),
- KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(0, 1)), 0), 3),
- KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(10, 20)), 4), 4),
- KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(10, 20)), 5), 5),
- KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(10, 20)), 6), 6));
- final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
-
- final HasNextCondition hasNextCondition = timeOrderedKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
- final List<Integer> results = new ArrayList<>();
- while (hasNextCondition.hasNext(iterator)) {
- results.add(iterator.next().value);
- }
- assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
- }
-
- @Test
- public void testUpperBoundWithLargeTimestamps() {
- final Bytes upper = timeOrderedKeySchema.upperRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), Long.MAX_VALUE);
-
- assertThat(
- "shorter key with max timestamp should be in range",
- upper.compareTo(
- TimeOrderedKeySchema.toStoreKeyBinary(
- new byte[] {0xA},
- Long.MAX_VALUE,
- Integer.MAX_VALUE
- )
- ) >= 0
- );
-
- assertThat(
- "shorter key with max timestamp should be in range",
- upper.compareTo(
- TimeOrderedKeySchema.toStoreKeyBinary(
- new byte[] {0xA, 0xB},
- Long.MAX_VALUE,
- Integer.MAX_VALUE
- )
- ) >= 0
- );
-
- assertThat(upper, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, Long.MAX_VALUE, Integer.MAX_VALUE)));
- }
-
- @Test
- public void testUpperBoundWithZeroTimestamp() {
- final Bytes upper = timeOrderedKeySchema.upperRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 0);
- assertThat(upper, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 0, Integer.MAX_VALUE)));
- }
-
- @Test
- public void testLowerBoundWithZeroTimestamp() {
- final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 0);
- assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 0, 0)));
- }
-
- @Test
- public void testLowerBoundWithNonZeroTimestamp() {
- final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 42);
- assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 42, 0)));
- }
-
- @Test
- public void testLowerBoundMatchesTrailingZeros() {
- final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), Long.MAX_VALUE - 1);
-
- assertThat(
- "appending zeros to key should still be in range",
- lower.compareTo(
- TimeOrderedKeySchema.toStoreKeyBinary(
- new byte[] {0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0},
- Long.MAX_VALUE - 1,
- 0
- )
- ) < 0
- );
-
- assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, Long.MAX_VALUE - 1, 0)));
- }
-
- @Test
public void shouldConvertToBinaryAndBack() {
final Bytes serialized = TimeOrderedKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
final Windowed<String> result = TimeOrderedKeySchema.fromStoreKey(serialized.get(), endTime - startTime, stateSerdes.keyDeserializer(), stateSerdes.topic());