You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/22 13:28:17 UTC
[kafka] branch trunk updated: MINOR: Clean up ThreadCacheTest
(#6485)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1acae2a MINOR: Clean up ThreadCacheTest (#6485)
1acae2a is described below
commit 1acae2a67c8fce071bfe7a373187bdff209f1705
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Mar 22 09:27:58 2019 -0400
MINOR: Clean up ThreadCacheTest (#6485)
Minor clean up ofThreadCacheTest
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
.../streams/state/internals/ThreadCacheTest.java | 96 +++++-----------------
1 file changed, 22 insertions(+), 74 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 5882ee4..c9c5789 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -146,7 +146,7 @@ public class ThreadCacheTest {
}
@Test
- public void evict() throws IOException {
+ public void evict() {
final List<KeyValue<String, String>> received = new ArrayList<>();
final List<KeyValue<String, String>> expected = Collections.singletonList(
new KeyValue<>("K1", "V1"));
@@ -161,14 +161,10 @@ public class ThreadCacheTest {
final ThreadCache cache = new ThreadCache(logContext,
memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
new MockStreamsMetrics(new Metrics()));
- cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
- received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
- }
+ cache.addDirtyEntryFlushListener(namespace, dirty -> {
+ for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+ received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
}
-
});
for (final KeyValue<String, String> kvToInsert : toInsert) {
@@ -200,12 +196,7 @@ public class ThreadCacheTest {
final Bytes key = Bytes.wrap(new byte[]{0});
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
- cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- received.addAll(dirty);
- }
- });
+ cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.put(namespace, key, dirtyEntry(key.get()));
assertEquals(key.get(), cache.delete(namespace, key).value());
@@ -298,12 +289,8 @@ public class ThreadCacheTest {
public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics()));
- cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+ cache.addDirtyEntryFlushListener(namespace, dirty -> { });
- }
- });
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
for (int i = 0; i < 5; i++) {
cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
@@ -322,12 +309,9 @@ public class ThreadCacheTest {
public void shouldFlushDirtyEntriesForNamespace() {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
final List<byte[]> received = new ArrayList<>();
- cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
- received.add(dirtyEntry.key().get());
- }
+ cache.addDirtyEntryFlushListener(namespace1, dirty -> {
+ for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+ received.add(dirtyEntry.key().get());
}
});
final List<byte[]> expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
@@ -344,12 +328,9 @@ public class ThreadCacheTest {
public void shouldNotFlushCleanEntriesForNamespace() {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
final List<byte[]> received = new ArrayList<>();
- cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
- received.add(dirtyEntry.key().get());
- }
+ cache.addDirtyEntryFlushListener(namespace1, dirty -> {
+ for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+ received.add(dirtyEntry.key().get());
}
});
final List<byte[]> toInsert = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
@@ -366,12 +347,7 @@ public class ThreadCacheTest {
private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final ThreadCache cache) {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
- cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- received.addAll(dirty);
- }
- });
+ cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.put(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{0}));
assertEquals(1, received.size());
@@ -396,12 +372,7 @@ public class ThreadCacheTest {
public void shouldEvictAfterPutAll() {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
- cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- received.addAll(dirty);
- }
- });
+ cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
@@ -425,12 +396,7 @@ public class ThreadCacheTest {
public void shouldNotForwardCleanEntryOnEviction() {
final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new Metrics()));
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
- cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- received.addAll(dirty);
- }
- });
+ cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
assertEquals(0, received.size());
}
@@ -448,12 +414,7 @@ public class ThreadCacheTest {
public void shouldEvictAfterPutIfAbsent() {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
- cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- received.addAll(dirty);
- }
- });
+ cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5}));
cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}));
@@ -468,26 +429,13 @@ public class ThreadCacheTest {
final int maxCacheSizeInBytes = 100;
final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
// trigger a put into another cache on eviction from "name"
- threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- // put an item into an empty cache when the total cache size
- // is already > than maxCacheSizeBytes
- threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
- }
- });
- threadCache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
- //
- }
- });
- threadCache.addDirtyEntryFlushListener(namespace2, new ThreadCache.DirtyEntryFlushListener() {
- @Override
- public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-
- }
+ threadCache.addDirtyEntryFlushListener(namespace, dirty -> {
+ // put an item into an empty cache when the total cache size
+ // is already > than maxCacheSizeBytes
+ threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
});
+ threadCache.addDirtyEntryFlushListener(namespace1, dirty -> { });
+ threadCache.addDirtyEntryFlushListener(namespace2, dirty -> { });
threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));