You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/06 17:47:32 UTC

[GitHub] [kafka] lucasbru opened a new pull request, #12959: MINOR: Fix various memory leaks in tests

lucasbru opened a new pull request, #12959:
URL: https://github.com/apache/kafka/pull/12959

   Various tests in the streams park were leaking native memory.
   
   Most tests were fixed by closing the corresponding rocksdb resource.
   
   I tested that the corresponding leak is gone by using a previous rocksdb
   release with finalizers and checking if the finalizers would be called at some
   point. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12959: MINOR: Fix various memory leaks in tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12959:
URL: https://github.com/apache/kafka/pull/12959#issuecomment-1361260371

   Build failures are not related:
   ```
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.StickyAssignorTest.testLargeAssignmentAndGroupWithNonEqualSubscription()
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV1, Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Distributed, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV1, Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.tools.MetadataQuorumCommandTest.[5] Type=Raft-CoReside, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.4-IV1, Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.tools.MetadataQuorumCommandTest.[6] Type=Raft-Distributed, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV1, Security=PLAINTEXT
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-CoReside, Name=testDescribeQuorumStatusSuccessful, MetadataVersion=3.4-IV1, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testCreatePartitionsUseProvidedForwardingAdmin()
   Build / JDK 17 and Scala 2.13 / kafka.api.ProducerIdExpirationTest.testProducerIdExpirationWithNoTransactions(String).quorum=zk
   Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on a diff in pull request #12959: MINOR: Fix various memory leaks in tests

Posted by GitBox <gi...@apache.org>.
lucasbru commented on code in PR #12959:
URL: https://github.com/apache/kafka/pull/12959#discussion_r1052575316


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java:
##########
@@ -130,6 +134,8 @@ public void shouldDeleteKeyAndPropagateV0() {
                   .withValue(new Change<>(newValue, oldValue)),
             forwarded.get(0).record()
         );
+
+        stateStore.close();

Review Comment:
   yeah, that's a duplicate



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -253,6 +258,11 @@ private void verifyColumnFamilyOptionsMethodCall(final Method method) throws Exc
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call ColumnFamilyOptions\\." + method.getName() +  "(.*)"));
+        } finally {
+            reset(mockedColumnFamilyOptions);

Review Comment:
   Done



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -148,6 +148,11 @@ private void verifyDBOptionsMethodCall(final Method method) throws Exception {
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):"));
+        } finally {
+            reset(mockedDbOptions);
+            mockedDbOptions.close();
+            replay(mockedDbOptions);
+            optionsFacadeDbOptions.close();

Review Comment:
   I'm just doing this so that easymock doesn't complain about `close` being called on the unterlying mock. I want to call close here to free up resources, and the best way here is to tell easymock what to expect on the mocks.
   
   I found an alternative way to do this via `resetToNice` which should be less confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12959: MINOR: Fix various memory leaks in tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12959:
URL: https://github.com/apache/kafka/pull/12959#discussion_r1043244790


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplierTest.java:
##########
@@ -130,6 +134,8 @@ public void shouldDeleteKeyAndPropagateV0() {
                   .withValue(new Change<>(newValue, oldValue)),
             forwarded.get(0).record()
         );
+
+        stateStore.close();

Review Comment:
   Why do you close the state store in the test here but not in the other tests?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -148,6 +148,11 @@ private void verifyDBOptionsMethodCall(final Method method) throws Exception {
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):"));
+        } finally {
+            reset(mockedDbOptions);
+            mockedDbOptions.close();
+            replay(mockedDbOptions);
+            optionsFacadeDbOptions.close();

Review Comment:
   I think it would be enough to just `optionsFacadeDbOptions.close();`. You already verify that `optionsFacadeDbOptions.close();` calls `mockedDbOptions.close();` in the try-block. No need to verify it again.
   Alternatively, you could add `close` to the list of ignored methods (`ignoreMethods`) and verify as you did. However, you need to add `verify(mockedDbOptions)` after `optionsFacadeDbOptions.close()` otherwise nothing is verified. 



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -333,23 +343,23 @@ public void shouldLogWarningWhenSettingWalOptions() throws Exception {
 
         try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class)) {
 
-            final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter
-                = new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions());
-
-            for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) {
-                if (walRelatedMethods.contains(method.getName())) {
-                    method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes()));
+            try (RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter =
+                     new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions())) {
+                for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) {
+                    if (walRelatedMethods.contains(method.getName())) {
+                        method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes()));
+                    }
                 }
-            }
 
-            final List<String> walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
+                final List<String> walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds");
 
-            final Set<String> logMessages = appender.getEvents().stream()
-                .filter(e -> e.getLevel().equals("WARN"))
-                .map(LogCaptureAppender.Event::getMessage)
-                .collect(Collectors.toSet());
+                final Set<String> logMessages = appender.getEvents().stream()
+                    .filter(e -> e.getLevel().equals("WARN"))
+                    .map(LogCaptureAppender.Event::getMessage)
+                    .collect(Collectors.toSet());
 
-            walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option))));
+                walOptions.forEach(option -> assertThat(logMessages, hasItem(String.format("WAL is explicitly disabled by Streams in RocksDB. Setting option '%s' will be ignored", option))));
+            }
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java:
##########
@@ -480,6 +480,7 @@ public void shouldCreateWriteBatches() {
         assertEquals(2, writeBatchMap.size());
         for (final WriteBatch batch : writeBatchMap.values()) {
             assertEquals(1, batch.count());
+            batch.close();

Review Comment:
   Nice catch!



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:
##########
@@ -253,6 +258,11 @@ private void verifyColumnFamilyOptionsMethodCall(final Method method) throws Exc
             assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class));
             assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
                 matchesPattern("Unexpected method call ColumnFamilyOptions\\." + method.getName() +  "(.*)"));
+        } finally {
+            reset(mockedColumnFamilyOptions);

Review Comment:
   See my comments above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #12959: MINOR: Fix various memory leaks in tests

Posted by GitBox <gi...@apache.org>.
cadonna merged PR #12959:
URL: https://github.com/apache/kafka/pull/12959


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org