You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/05/05 10:28:54 UTC
[kafka] branch trunk updated: KAFKA-10767: Adding test cases for
all, reverseAll and reverseRange for ThreadCache (#9779)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9a71468 KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779)
9a71468 is described below
commit 9a71468cb0e5fc9faeba39b56f36d0c93ca73c59
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Wed May 5 15:56:51 2021 +0530
KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779)
The test cases for ThreaCache didn't have the corresponding unit tests for all, reverseAll and reverseRange methods. This PR aims to add the same.
Reviewers: Bruno Cadonna <ca...@apache.org>
---
.../streams/state/internals/ThreadCacheTest.java | 225 +++++++++++++++------
1 file changed, 168 insertions(+), 57 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index afd5449..c449de9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import static org.hamcrest.MatcherAssert.assertThat;
import org.junit.Test;
import java.util.ArrayList;
@@ -29,7 +30,9 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.function.Supplier;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -42,19 +45,20 @@ public class ThreadCacheTest {
final String namespace1 = "0.1-namespace";
final String namespace2 = "0.2-namespace";
private final LogContext logContext = new LogContext("testCache ");
+ private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
@Test
public void basicPutGet() {
final List<KeyValue<String, String>> toInsert = Arrays.asList(
- new KeyValue<>("K1", "V1"),
- new KeyValue<>("K2", "V2"),
- new KeyValue<>("K3", "V3"),
- new KeyValue<>("K4", "V4"),
- new KeyValue<>("K5", "V5"));
+ new KeyValue<>("K1", "V1"),
+ new KeyValue<>("K2", "V2"),
+ new KeyValue<>("K3", "V3"),
+ new KeyValue<>("K4", "V4"),
+ new KeyValue<>("K5", "V5"));
final KeyValue<String, String> kv = toInsert.get(0);
final ThreadCache cache = new ThreadCache(logContext,
- toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
- new MockStreamsMetrics(new Metrics()));
+ toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
+ new MockStreamsMetrics(new Metrics()));
for (final KeyValue<String, String> kvToInsert : toInsert) {
final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
@@ -132,35 +136,35 @@ public class ThreadCacheTest {
static int memoryCacheEntrySize(final byte[] key, final byte[] value, final String topic) {
return key.length +
- value.length +
- 1 + // isDirty
- 8 + // timestamp
- 8 + // offset
- 4 +
- topic.length() +
- // LRU Node entries
- key.length +
- 8 + // entry
- 8 + // previous
- 8; // next
+ value.length +
+ 1 + // isDirty
+ 8 + // timestamp
+ 8 + // offset
+ 4 +
+ topic.length() +
+ // LRU Node entries
+ key.length +
+ 8 + // entry
+ 8 + // previous
+ 8; // next
}
@Test
public void evict() {
final List<KeyValue<String, String>> received = new ArrayList<>();
final List<KeyValue<String, String>> expected = Collections.singletonList(
- new KeyValue<>("K1", "V1"));
+ new KeyValue<>("K1", "V1"));
final List<KeyValue<String, String>> toInsert = Arrays.asList(
- new KeyValue<>("K1", "V1"),
- new KeyValue<>("K2", "V2"),
- new KeyValue<>("K3", "V3"),
- new KeyValue<>("K4", "V4"),
- new KeyValue<>("K5", "V5"));
+ new KeyValue<>("K1", "V1"),
+ new KeyValue<>("K2", "V2"),
+ new KeyValue<>("K3", "V3"),
+ new KeyValue<>("K4", "V4"),
+ new KeyValue<>("K5", "V5"));
final KeyValue<String, String> kv = toInsert.get(0);
final ThreadCache cache = new ThreadCache(logContext,
- memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
- new MockStreamsMetrics(new Metrics()));
+ memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
+ new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, dirty -> {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
@@ -233,46 +237,88 @@ public class ThreadCacheTest {
assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value());
}
+ private ThreadCache setupThreadCache(final int first, final int last, final long entrySize, final boolean reverse) {
+ final ThreadCache cache = new ThreadCache(logContext, entrySize, new MockStreamsMetrics(new Metrics()));
+ cache.addDirtyEntryFlushListener(namespace, dirty -> { });
+ int index = first;
+ while ((!reverse && index < last) || (reverse && index >= last)) {
+ cache.put(namespace, Bytes.wrap(bytes[index]), dirtyEntry(bytes[index]));
+ if (!reverse)
+ index++;
+ else
+ index--;
+ }
+ return cache;
+ }
+
@Test
public void shouldPeekNextKey() {
- final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
+ final ThreadCache cache = setupThreadCache(0, 1, 10000L, false);
final Bytes theByte = Bytes.wrap(new byte[]{0});
- cache.put(namespace, theByte, dirtyEntry(theByte.get()));
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
assertEquals(theByte, iterator.peekNextKey());
assertEquals(theByte, iterator.peekNextKey());
}
@Test
+ public void shouldPeekNextKeyReverseRange() {
+ final ThreadCache cache = setupThreadCache(1, 1, 10000L, true);
+ final Bytes theByte = Bytes.wrap(new byte[]{1});
+ final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
+ assertThat(iterator.peekNextKey(), is(theByte));
+ assertThat(iterator.peekNextKey(), is(theByte));
+ }
+
+ @Test
public void shouldGetSameKeyAsPeekNext() {
- final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
+ final ThreadCache cache = setupThreadCache(0, 1, 10000L, false);
final Bytes theByte = Bytes.wrap(new byte[]{0});
- cache.put(namespace, theByte, dirtyEntry(theByte.get()));
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
- assertEquals(iterator.peekNextKey(), iterator.next().key);
+ assertThat(iterator.peekNextKey(), is(iterator.next().key));
}
@Test
- public void shouldThrowIfNoPeekNextKey() {
- final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
- final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
+ public void shouldGetSameKeyAsPeekNextReverseRange() {
+ final ThreadCache cache = setupThreadCache(1, 1, 10000L, true);
+ final Bytes theByte = Bytes.wrap(new byte[]{1});
+ final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
+ assertThat(iterator.peekNextKey(), is(iterator.next().key));
+ }
+
+ private void shouldThrowIfNoPeekNextKey(final Supplier<ThreadCache.MemoryLRUCacheBytesIterator> methodUnderTest) {
+ final ThreadCache.MemoryLRUCacheBytesIterator iterator = methodUnderTest.get();
assertThrows(NoSuchElementException.class, iterator::peekNextKey);
}
@Test
+ public void shouldThrowIfNoPeekNextKeyRange() {
+ final ThreadCache cache = setupThreadCache(0, 0, 10000L, false);
+ shouldThrowIfNoPeekNextKey(() -> cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
+ }
+
+ @Test
+ public void shouldThrowIfNoPeekNextKeyReverseRange() {
+ final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true);
+ shouldThrowIfNoPeekNextKey(() -> cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
+ }
+
+ @Test
public void shouldReturnFalseIfNoNextKey() {
- final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
+ final ThreadCache cache = setupThreadCache(0, 0, 10000L, false);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
assertFalse(iterator.hasNext());
}
@Test
+ public void shouldReturnFalseIfNoNextKeyReverseRange() {
+ final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true);
+ final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void shouldPeekAndIterateOverRange() {
- final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
- final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
- for (final byte[] aByte : bytes) {
- cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
- }
+ final ThreadCache cache = setupThreadCache(0, 10, 10000L, false);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}));
int bytesIndex = 1;
while (iterator.hasNext()) {
@@ -286,12 +332,8 @@ public class ThreadCacheTest {
}
@Test
- public void shouldSkipToEntryWhentoInclusiveIsFalseInRange() {
- final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
- final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
- for (final byte[] aByte : bytes) {
- cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
- }
+ public void shouldSkipToEntryWhenToInclusiveIsFalseInRange() {
+ final ThreadCache cache = setupThreadCache(0, 10, 10000L, false);
final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}), false);
int bytesIndex = 1;
while (iterator.hasNext()) {
@@ -305,26 +347,95 @@ public class ThreadCacheTest {
}
@Test
+ public void shouldPeekAndIterateOverReverseRange() {
+ final ThreadCache cache = setupThreadCache(10, 0, 10000L, true);
+ final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}));
+ int bytesIndex = 4;
+ while (iterator.hasNext()) {
+ final Bytes peekedKey = iterator.peekNextKey();
+ final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
+ assertArrayEquals(bytes[bytesIndex], peekedKey.get());
+ assertArrayEquals(bytes[bytesIndex], next.key.get());
+ bytesIndex--;
+ }
+ assertEquals(0, bytesIndex);
+ }
+
+ @Test
public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
- final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics()));
- cache.addDirtyEntryFlushListener(namespace, dirty -> { });
-
- final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
- for (int i = 0; i < 5; i++) {
- cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
- }
+ final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
assertEquals(5, cache.size());
-
// should evict byte[] {0}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
-
final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
+ assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
+ }
+
+ @Test
+ public void shouldSkipEntriesWhereValueHasBeenEvictedFromCacheReverseRange() {
+ final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+ final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+ assertEquals(5, cache.size());
+ // should evict byte[] {4}
+ cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
+ final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
+ assertEquals(Bytes.wrap(new byte[]{3}), range.peekNextKey());
+ }
+
+ @Test
+ public void shouldFetchAllEntriesInCache() {
+ final ThreadCache cache = setupThreadCache(0, 11, 10000L, false);
+ final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.all(namespace);
+ int bytesIndex = 0;
+ while (iterator.hasNext()) {
+ final Bytes peekedKey = iterator.peekNextKey();
+ final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
+ assertArrayEquals(bytes[bytesIndex], peekedKey.get());
+ assertArrayEquals(bytes[bytesIndex], next.key.get());
+ bytesIndex++;
+ }
+ assertEquals(11, bytesIndex);
+ }
+ @Test
+ public void shouldFetchAllEntriesInCacheInReverseOrder() {
+ final ThreadCache cache = setupThreadCache(10, 0, 10000L, true);
+ final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseAll(namespace);
+ int bytesIndex = 10;
+ while (iterator.hasNext()) {
+ final Bytes peekedKey = iterator.peekNextKey();
+ final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
+ assertArrayEquals(bytes[bytesIndex], peekedKey.get());
+ assertArrayEquals(bytes[bytesIndex], next.key.get());
+ bytesIndex--;
+ }
+ assertEquals(-1, bytesIndex);
+ }
+
+ @Test
+ public void shouldReturnAllUnevictedValuesFromCache() {
+ final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+ final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
+ assertEquals(5, cache.size());
+ // should evict byte[] {0}
+ cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
+ final ThreadCache.MemoryLRUCacheBytesIterator range = cache.all(namespace);
assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
}
@Test
+ public void shouldReturnAllUnevictedValuesFromCacheInReverseOrder() {
+ final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+ final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+ assertEquals(5, cache.size());
+ // should evict byte[] {4}
+ cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
+ final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseAll(namespace);
+ assertEquals(Bytes.wrap(new byte[]{6}), range.peekNextKey());
+ }
+
+ @Test
public void shouldFlushDirtyEntriesForNamespace() {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
final List<byte[]> received = new ArrayList<>();
@@ -394,7 +505,7 @@ public class ThreadCacheTest {
cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
- KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
+ KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
assertEquals(cache.evicts(), 2);
assertEquals(received.size(), 2);
@@ -405,7 +516,7 @@ public class ThreadCacheTest {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
- KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
+ KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value());
assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value());