You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/08 15:28:56 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12959: MINOR: Fix various memory leaks in tests

cadonna commented on code in PR #12959:
URL: https://github.com/apache/kafka/pull/12959#discussion_r1043244790


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java:
##########
@@ -130,6 +134,8 @@ public void shouldDeleteKeyAndPropagateV0() {
                   .withValue(new Change<>(newValue, oldValue)),
             forwarded.get(0).record()
         );
+
+        stateStore.close();

Review Comment:
   Why do you close the state store in the test here but not in the other tests?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -148,6 +148,11 @@ private void verifyDBOptionsMethodCall(final Method method) throws Exception {
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):"));
+        } finally {
+            reset(mockedDbOptions);
+            mockedDbOptions.close();
+            replay(mockedDbOptions);
+            optionsFacadeDbOptions.close();

Review Comment:
   I think it would be enough to just `optionsFacadeDbOptions.close();`. You already verify that `optionsFacadeDbOptions.close();` calls `mockedDbOptions.close();` in the try-block. No need to verify it again.
   Alternatively, you could add `close` to the list of ignored methods (`ignoreMethods`) and verify as you did. However, you need to add `verify(mockedDbOptions)` after `optionsFacadeDbOptions.close()` otherwise nothing is verified. 



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -333,23 +343,23 @@ public void shouldLogWarningWhenSettingWalOptions() throws Exception {
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class)) {
 
-            final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter
-                = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions());
-
-            for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) {
-                if (walRelatedMethods.contains(method.getName())) {
-                    method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes()));
+            try (RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter =
+                     new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions())) {
+                for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) {
+                    if (walRelatedMethods.contains(method.getName())) {
+                        method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes()));
+                    }
                 }
-            }
 
-            final List<String> walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
+                final List<String> walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
 
-            final Set<String> logMessages = appender.getEvents().stream()
-                .filter(e -> e.getLevel().equals("WARN"))
-                .map(LogCaptureAppender.Event::getMessage)
-                .collect(Collectors.toSet());
+                final Set<String> logMessages = appender.getEvents().stream()
+                    .filter(e -> e.getLevel().equals("WARN"))
+                    .map(LogCaptureAppender.Event::getMessage)
+                    .collect(Collectors.toSet());
 
-            walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option))));
+                walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option))));
+            }
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java:
##########
@@ -480,6 +480,7 @@ public void shouldCreateWriteBatches() {
         assertEquals(2, writeBatchMap.size());
         for (final WriteBatch batch : writeBatchMap.values()) {
             assertEquals(1, batch.count());
+            batch.close();

Review Comment:
   Nice catch!



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -253,6 +258,11 @@ private void verifyColumnFamilyOptionsMethodCall(final Method method) throws Exc
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call ColumnFamilyOptions\\." + method.getName() +  "(.*)"));
+        } finally {
+            reset(mockedColumnFamilyOptions);

Review Comment:
   See my comments above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org