You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/12/21 12:38:13 UTC

[kafka] branch trunk updated: MINOR: Fix various memory leaks in tests (#12959)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 26daa8d610c MINOR: Fix various memory leaks in tests (#12959)
26daa8d610c is described below

commit 26daa8d610ca92ff8d4dd37420b61c960af478e1
Author: Lucas Brutschy <lu...@users.noreply.github.com>
AuthorDate: Wed Dec 21 13:38:05 2022 +0100

    MINOR: Fix various memory leaks in tests (#12959)
    
    Various tests in the streams park were leaking native memory.
    
    Most tests were fixed by closing the corresponding rocksdb resource.
    
    I tested that the corresponding leak is gone by using a previous rocksdb
    release with finalizers and checking if the finalizers would be called at some
    point.
    
    Reviewer: Bruno Cadonna <ca...@apache.org>
---
 ...scriptionStoreReceiveProcessorSupplierTest.java | 21 +++++++------
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |  1 +
 ...ockBasedTableConfigWithAccessibleCacheTest.java |  9 +++---
 ...sToDbOptionsColumnFamilyOptionsAdapterTest.java | 34 +++++++++++++---------
 4 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
index 17f34d56d3f..59636556c40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java
@@ -50,6 +50,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private File stateDir;
     private MockInternalNewProcessorContext<CombinedKey<String, String>, Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> context;
+    private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = null;
 
     private static final String FK = "fk1";
     private static final String PK1 = "pk1";
@@ -71,6 +72,9 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
 
     @After
     public void after() throws IOException {
+        if (stateStore != null) {
+            stateStore.close();
+        }
         Utils.delete(stateDir);
     }
 
@@ -88,7 +92,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
                         SubscriptionWrapper<String>,
                         CombinedKey<String, String>,
                         Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
 
@@ -140,10 +144,9 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
             SubscriptionWrapper<String>,
             CombinedKey<String, String>,
             Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
-
         final SubscriptionWrapper<String> oldWrapper = new SubscriptionWrapper<>(
             new long[]{1L, 2L},
             Instruction.DELETE_KEY_AND_PROPAGATE,
@@ -192,7 +195,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
             SubscriptionWrapper<String>,
             CombinedKey<String, String>,
             Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
 
@@ -244,7 +247,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
             SubscriptionWrapper<String>,
             CombinedKey<String, String>,
             Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
 
@@ -296,7 +299,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
             SubscriptionWrapper<String>,
             CombinedKey<String, String>,
             Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
 
@@ -348,7 +351,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
             SubscriptionWrapper<String>,
             CombinedKey<String, String>,
             Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
 
@@ -400,7 +403,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
             SubscriptionWrapper<String>,
             CombinedKey<String, String>,
             Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
 
@@ -452,7 +455,7 @@ public class SubscriptionStoreReceiveProcessorSupplierTest {
             SubscriptionWrapper<String>,
             CombinedKey<String, String>,
             Change<ValueAndTimestamp<SubscriptionWrapper<String>>>> processor = supplier.get();
-        final TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>> stateStore = storeBuilder.build();
+        stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 6e1b3bfcf8c..852f6fd3ceb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -480,6 +480,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         assertEquals(2, writeBatchMap.size());
         for (final WriteBatch batch : writeBatchMap.values()) {
             assertEquals(1, batch.count());
+            batch.close();
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/BlockBasedTableConfigWithAccessibleCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/BlockBasedTableConfigWithAccessibleCacheTest.java
index 1904789ec72..ffcbef23ce8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/BlockBasedTableConfigWithAccessibleCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/BlockBasedTableConfigWithAccessibleCacheTest.java
@@ -44,11 +44,12 @@ public class BlockBasedTableConfigWithAccessibleCacheTest {
     public void shouldSetBlockCacheAndMakeItAccessible() {
         final BlockBasedTableConfigWithAccessibleCache configWithAccessibleCache =
             new BlockBasedTableConfigWithAccessibleCache();
-        final Cache blockCache = new LRUCache(1024);
+        try (final Cache blockCache = new LRUCache(1024)) {
 
-        final BlockBasedTableConfig updatedConfig = configWithAccessibleCache.setBlockCache(blockCache);
+            final BlockBasedTableConfig updatedConfig = configWithAccessibleCache.setBlockCache(blockCache);
 
-        assertThat(updatedConfig, sameInstance(configWithAccessibleCache));
-        assertThat(configWithAccessibleCache.blockCache(), sameInstance(blockCache));
+            assertThat(updatedConfig, sameInstance(configWithAccessibleCache));
+            assertThat(configWithAccessibleCache.blockCache(), sameInstance(blockCache));
+        }
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
index 8eec0a40056..727f46f81a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
@@ -64,6 +64,7 @@ import java.util.stream.Collectors;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.resetToNice;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -148,6 +149,9 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):"));
+        } finally {
+            resetToNice(mockedDbOptions);
+            optionsFacadeDbOptions.close();
         }
     }
 
@@ -253,6 +257,9 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call ColumnFamilyOptions\\." + method.getName() +  "(.*)"));
+        } finally {
+            resetToNice(mockedColumnFamilyOptions);
+            optionsFacadeColumnFamilyOptions.close();
         }
     }
 
@@ -333,24 +340,23 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class)) {
 
-            final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter
-                = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions());
-
-            for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) {
-                if (walRelatedMethods.contains(method.getName())) {
-                    method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes()));
+            try (RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter =
+                     new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions())) {
+                for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) {
+                    if (walRelatedMethods.contains(method.getName())) {
+                        method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes()));
+                    }
                 }
-            }
 
-            final List<String> walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
+                final List<String> walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
 
-            final Set<String> logMessages = appender.getEvents().stream()
-                .filter(e -> e.getLevel().equals("WARN"))
-                .map(LogCaptureAppender.Event::getMessage)
-                .collect(Collectors.toSet());
-
-            walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option))));
+                final Set<String> logMessages = appender.getEvents().stream()
+                    .filter(e -> e.getLevel().equals("WARN"))
+                    .map(LogCaptureAppender.Event::getMessage)
+                    .collect(Collectors.toSet());
 
+                walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option))));
+            }
         }
     }
 }