You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2021/05/05 10:28:54 UTC

[kafka] branch trunk updated: KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a71468  KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779)
9a71468 is described below

commit 9a71468cb0e5fc9faeba39b56f36d0c93ca73c59
Author: vamossagar12 <sa...@gmail.com>
AuthorDate: Wed May 5 15:56:51 2021 +0530

    KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache (#9779)
    
    The test cases for ThreaCache didn't have the corresponding unit tests for all, reverseAll and reverseRange methods. This PR aims to add the same.
    
    Reviewers: Bruno Cadonna <ca...@apache.org>
---
 .../streams/state/internals/ThreadCacheTest.java   | 225 +++++++++++++++------
 1 file changed, 168 insertions(+), 57 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index afd5449..c449de9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import static org.hamcrest.MatcherAssert.assertThat;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -29,7 +30,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.function.Supplier;
 
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -42,19 +45,20 @@ public class ThreadCacheTest {
     final String namespace1 = "0.1-namespace";
     final String namespace2 = "0.2-namespace";
     private final LogContext logContext = new LogContext("testCache ");
+    private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
 
     @Test
     public void basicPutGet() {
         final List<KeyValue<String, String>> toInsert = Arrays.asList(
-                new KeyValue<>("K1", "V1"),
-                new KeyValue<>("K2", "V2"),
-                new KeyValue<>("K3", "V3"),
-                new KeyValue<>("K4", "V4"),
-                new KeyValue<>("K5", "V5"));
+            new KeyValue<>("K1", "V1"),
+            new KeyValue<>("K2", "V2"),
+            new KeyValue<>("K3", "V3"),
+            new KeyValue<>("K4", "V4"),
+            new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
         final ThreadCache cache = new ThreadCache(logContext,
-                                                  toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
-                                                  new MockStreamsMetrics(new Metrics()));
+            toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
+            new MockStreamsMetrics(new Metrics()));
 
         for (final KeyValue<String, String> kvToInsert : toInsert) {
             final Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
@@ -132,35 +136,35 @@ public class ThreadCacheTest {
 
     static int memoryCacheEntrySize(final byte[] key, final byte[] value, final String topic) {
         return key.length +
-                value.length +
-                1 + // isDirty
-                8 + // timestamp
-                8 + // offset
-                4 +
-                topic.length() +
-                // LRU Node entries
-                key.length +
-                8 + // entry
-                8 + // previous
-                8; // next
+            value.length +
+            1 + // isDirty
+            8 + // timestamp
+            8 + // offset
+            4 +
+            topic.length() +
+            // LRU Node entries
+            key.length +
+            8 + // entry
+            8 + // previous
+            8; // next
     }
 
     @Test
     public void evict() {
         final List<KeyValue<String, String>> received = new ArrayList<>();
         final List<KeyValue<String, String>> expected = Collections.singletonList(
-                new KeyValue<>("K1", "V1"));
+            new KeyValue<>("K1", "V1"));
 
         final List<KeyValue<String, String>> toInsert = Arrays.asList(
-                new KeyValue<>("K1", "V1"),
-                new KeyValue<>("K2", "V2"),
-                new KeyValue<>("K3", "V3"),
-                new KeyValue<>("K4", "V4"),
-                new KeyValue<>("K5", "V5"));
+            new KeyValue<>("K1", "V1"),
+            new KeyValue<>("K2", "V2"),
+            new KeyValue<>("K3", "V3"),
+            new KeyValue<>("K4", "V4"),
+            new KeyValue<>("K5", "V5"));
         final KeyValue<String, String> kv = toInsert.get(0);
         final ThreadCache cache = new ThreadCache(logContext,
-                                                  memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
-                                                  new MockStreamsMetrics(new Metrics()));
+            memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
+            new MockStreamsMetrics(new Metrics()));
         cache.addDirtyEntryFlushListener(namespace, dirty -> {
             for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
                 received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
@@ -233,46 +237,88 @@ public class ThreadCacheTest {
         assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value());
     }
 
+    private ThreadCache setupThreadCache(final int first, final int last, final long entrySize, final boolean reverse) {
+        final ThreadCache cache = new ThreadCache(logContext, entrySize, new MockStreamsMetrics(new Metrics()));
+        cache.addDirtyEntryFlushListener(namespace, dirty -> { });
+        int index = first;
+        while ((!reverse && index < last) || (reverse && index >= last)) {
+            cache.put(namespace, Bytes.wrap(bytes[index]), dirtyEntry(bytes[index]));
+            if (!reverse)
+                index++;
+            else
+                index--;
+        }
+        return cache;
+    }
+
     @Test
     public void shouldPeekNextKey() {
-        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = setupThreadCache(0, 1, 10000L, false);
         final Bytes theByte = Bytes.wrap(new byte[]{0});
-        cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
         assertEquals(theByte, iterator.peekNextKey());
         assertEquals(theByte, iterator.peekNextKey());
     }
 
     @Test
+    public void shouldPeekNextKeyReverseRange() {
+        final ThreadCache cache = setupThreadCache(1, 1, 10000L, true);
+        final Bytes theByte = Bytes.wrap(new byte[]{1});
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
+        assertThat(iterator.peekNextKey(), is(theByte));
+        assertThat(iterator.peekNextKey(), is(theByte));
+    }
+
+    @Test
     public void shouldGetSameKeyAsPeekNext() {
-        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = setupThreadCache(0, 1, 10000L, false);
         final Bytes theByte = Bytes.wrap(new byte[]{0});
-        cache.put(namespace, theByte, dirtyEntry(theByte.get()));
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1}));
-        assertEquals(iterator.peekNextKey(), iterator.next().key);
+        assertThat(iterator.peekNextKey(), is(iterator.next().key));
     }
 
     @Test
-    public void shouldThrowIfNoPeekNextKey() {
-        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
-        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
+    public void shouldGetSameKeyAsPeekNextReverseRange() {
+        final ThreadCache cache = setupThreadCache(1, 1, 10000L, true);
+        final Bytes theByte = Bytes.wrap(new byte[]{1});
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
+        assertThat(iterator.peekNextKey(), is(iterator.next().key));
+    }
+
+    private void shouldThrowIfNoPeekNextKey(final Supplier<ThreadCache.MemoryLRUCacheBytesIterator> methodUnderTest) {
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = methodUnderTest.get();
         assertThrows(NoSuchElementException.class, iterator::peekNextKey);
     }
 
     @Test
+    public void shouldThrowIfNoPeekNextKeyRange() {
+        final ThreadCache cache = setupThreadCache(0, 0, 10000L, false);
+        shouldThrowIfNoPeekNextKey(() -> cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
+    }
+
+    @Test
+    public void shouldThrowIfNoPeekNextKeyReverseRange() {
+        final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true);
+        shouldThrowIfNoPeekNextKey(() -> cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
+    }
+
+    @Test
     public void shouldReturnFalseIfNoNextKey() {
-        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
+        final ThreadCache cache = setupThreadCache(0, 0, 10000L, false);
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
         assertFalse(iterator.hasNext());
     }
 
     @Test
+    public void shouldReturnFalseIfNoNextKeyReverseRange() {
+        final ThreadCache cache = setupThreadCache(-1, 0, 10000L, true);
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
     public void shouldPeekAndIterateOverRange() {
-        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
-        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
-        for (final byte[] aByte : bytes) {
-            cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
-        }
+        final ThreadCache cache = setupThreadCache(0, 10, 10000L, false);
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}));
         int bytesIndex = 1;
         while (iterator.hasNext()) {
@@ -286,12 +332,8 @@ public class ThreadCacheTest {
     }
 
     @Test
-    public void shouldSkipToEntryWhentoInclusiveIsFalseInRange() {
-        final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
-        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
-        for (final byte[] aByte : bytes) {
-            cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte));
-        }
+    public void shouldSkipToEntryWhenToInclusiveIsFalseInRange() {
+        final ThreadCache cache = setupThreadCache(0, 10, 10000L, false);
         final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}), false);
         int bytesIndex = 1;
         while (iterator.hasNext()) {
@@ -305,26 +347,95 @@ public class ThreadCacheTest {
     }
 
     @Test
+    public void shouldPeekAndIterateOverReverseRange() {
+        final ThreadCache cache = setupThreadCache(10, 0, 10000L, true);
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{4}));
+        int bytesIndex = 4;
+        while (iterator.hasNext()) {
+            final Bytes peekedKey = iterator.peekNextKey();
+            final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
+            assertArrayEquals(bytes[bytesIndex], peekedKey.get());
+            assertArrayEquals(bytes[bytesIndex], next.key.get());
+            bytesIndex--;
+        }
+        assertEquals(0, bytesIndex);
+    }
+
+    @Test
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
         final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
-        final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, dirty -> { });
-
-        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
-        for (int i = 0; i < 5; i++) {
-            cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
-        }
+        final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
         assertEquals(5, cache.size());
-
         // should evict byte[] {0}
         cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
-
         final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
+        assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
+    }
+
+    @Test
+    public void shouldSkipEntriesWhereValueHasBeenEvictedFromCacheReverseRange() {
+        final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+        final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+        assertEquals(5, cache.size());
+        // should evict byte[] {4}
+        cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
+        final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
+        assertEquals(Bytes.wrap(new byte[]{3}), range.peekNextKey());
+    }
+
+    @Test
+    public void shouldFetchAllEntriesInCache() {
+        final ThreadCache cache = setupThreadCache(0, 11, 10000L, false);
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.all(namespace);
+        int bytesIndex = 0;
+        while (iterator.hasNext()) {
+            final Bytes peekedKey = iterator.peekNextKey();
+            final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
+            assertArrayEquals(bytes[bytesIndex], peekedKey.get());
+            assertArrayEquals(bytes[bytesIndex], next.key.get());
+            bytesIndex++;
+        }
+        assertEquals(11, bytesIndex);
+    }
 
+    @Test
+    public void shouldFetchAllEntriesInCacheInReverseOrder() {
+        final ThreadCache cache = setupThreadCache(10, 0, 10000L, true);
+        final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseAll(namespace);
+        int bytesIndex = 10;
+        while (iterator.hasNext()) {
+            final Bytes peekedKey = iterator.peekNextKey();
+            final KeyValue<Bytes, LRUCacheEntry> next = iterator.next();
+            assertArrayEquals(bytes[bytesIndex], peekedKey.get());
+            assertArrayEquals(bytes[bytesIndex], next.key.get());
+            bytesIndex--;
+        }
+        assertEquals(-1, bytesIndex);
+    }
+
+    @Test
+    public void shouldReturnAllUnevictedValuesFromCache() {
+        final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+        final ThreadCache cache = setupThreadCache(0, 5, entrySize * 5, false);
+        assertEquals(5, cache.size());
+        // should evict byte[] {0}
+        cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
+        final ThreadCache.MemoryLRUCacheBytesIterator range = cache.all(namespace);
         assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
     }
 
     @Test
+    public void shouldReturnAllUnevictedValuesFromCacheInReverseOrder() {
+        final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
+        final ThreadCache cache = setupThreadCache(4, 0, entrySize * 5, true);
+        assertEquals(5, cache.size());
+        // should evict byte[] {4}
+        cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
+        final ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseAll(namespace);
+        assertEquals(Bytes.wrap(new byte[]{6}), range.peekNextKey());
+    }
+
+    @Test
     public void shouldFlushDirtyEntriesForNamespace() {
         final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
@@ -394,7 +505,7 @@ public class ThreadCacheTest {
         cache.addDirtyEntryFlushListener(namespace, received::addAll);
 
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
-                                              KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
+            KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
 
         assertEquals(cache.evicts(), 2);
         assertEquals(received.size(), 2);
@@ -405,7 +516,7 @@ public class ThreadCacheTest {
         final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
 
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
-                                           KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
+            KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
 
         assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value());
         assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value());