You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/22 13:28:17 UTC

[kafka] branch trunk updated: MINOR: Clean up ThreadCacheTest (#6485)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1acae2a  MINOR: Clean up ThreadCacheTest (#6485)
1acae2a is described below

commit 1acae2a67c8fce071bfe7a373187bdff209f1705
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Fri Mar 22 09:27:58 2019 -0400

    MINOR: Clean up ThreadCacheTest (#6485)
    
    Minor clean up ofThreadCacheTest
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
 .../streams/state/internals/ThreadCacheTest.java   | 96 +++++-----------------
 1 file changed, 22 insertions(+), 74 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 5882ee4..c9c5789 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -146,7 +146,7 @@ public class ThreadCacheTest {
     }
 
     @Test
-    public void evict() throws IOException {
+    public void evict() {
         final List<KeyValue<String, String>> received = new ArrayList<>();
         final List<KeyValue<String, String>> expected = Collections.singletonList(
                 new KeyValue<>("K1", "V1"));
@@ -161,14 +161,10 @@ public class ThreadCacheTest {
         final ThreadCache cache = new ThreadCache(logContext,
                                                   memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
                                                   new MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
-                    received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
-                }
+        cache.addDirtyEntryFlushListener(namespace, dirty -> {
+            for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+                received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
             }
-
         });
 
         for (final KeyValue<String, String> kvToInsert : toInsert) {
@@ -200,12 +196,7 @@ public class ThreadCacheTest {
         final Bytes key = Bytes.wrap(new byte[]{0});
         final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
         cache.put(namespace, key, dirtyEntry(key.get()));
         assertEquals(key.get(), cache.delete(namespace, key).value());
 
@@ -298,12 +289,8 @@ public class ThreadCacheTest {
     public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
         final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
         final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+        cache.addDirtyEntryFlushListener(namespace, dirty -> { });
 
-            }
-        });
         final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
         for (int i = 0; i < 5; i++) {
             cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
@@ -322,12 +309,9 @@ public class ThreadCacheTest {
     public void shouldFlushDirtyEntriesForNamespace() {
         final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
-                    received.add(dirtyEntry.key().get());
-                }
+        cache.addDirtyEntryFlushListener(namespace1, dirty -> {
+            for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+                received.add(dirtyEntry.key().get());
             }
         });
         final List<byte[]> expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
@@ -344,12 +328,9 @@ public class ThreadCacheTest {
     public void shouldNotFlushCleanEntriesForNamespace() {
         final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
         final List<byte[]> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
-                    received.add(dirtyEntry.key().get());
-                }
+        cache.addDirtyEntryFlushListener(namespace1, dirty -> {
+            for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
+                received.add(dirtyEntry.key().get());
             }
         });
         final List<byte[]> toInsert =  Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
@@ -366,12 +347,7 @@ public class ThreadCacheTest {
     private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final ThreadCache cache) {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
 
-        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
         cache.put(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{0}));
         assertEquals(1, received.size());
 
@@ -396,12 +372,7 @@ public class ThreadCacheTest {
     public void shouldEvictAfterPutAll() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
 
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
                                               KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
@@ -425,12 +396,7 @@ public class ThreadCacheTest {
     public void shouldNotForwardCleanEntryOnEviction() {
         final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new Metrics()));
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
-        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
         cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
         assertEquals(0, received.size());
     }
@@ -448,12 +414,7 @@ public class ThreadCacheTest {
     public void shouldEvictAfterPutIfAbsent() {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
-        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                received.addAll(dirty);
-            }
-        });
+        cache.addDirtyEntryFlushListener(namespace, received::addAll);
 
         cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5}));
         cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}));
@@ -468,26 +429,13 @@ public class ThreadCacheTest {
         final int maxCacheSizeInBytes = 100;
         final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
         // trigger a put into another cache on eviction from "name"
-        threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-                // put an item into an empty cache when the total cache size
-                // is already > than maxCacheSizeBytes
-                threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
-            }
-        });
-        threadCache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-               //
-            }
-        });
-        threadCache.addDirtyEntryFlushListener(namespace2, new ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
-
-            }
+        threadCache.addDirtyEntryFlushListener(namespace, dirty -> {
+            // put an item into an empty cache when the total cache size
+            // is already > than maxCacheSizeBytes
+            threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
         });
+        threadCache.addDirtyEntryFlushListener(namespace1, dirty -> { });
+        threadCache.addDirtyEntryFlushListener(namespace2, dirty -> { });
 
         threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
         threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));