You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/06 06:32:36 UTC
[pulsar] branch master updated: Fix memory leak when running topic
compaction. (#6485)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f2ec1b4 Fix memory leak when running topic compaction. (#6485)
f2ec1b4 is described below
commit f2ec1b4e2836859b0a6beb9b5a12656e4bcaf8f9
Author: Rolf Arne Corneliussen <ra...@users.noreply.github.com>
AuthorDate: Fri Mar 6 07:32:28 2020 +0100
Fix memory leak when running topic compaction. (#6485)
Fixes #6482
### Motivation
Prevent topic compaction from leaking direct memory
### Modifications
Several leaks were discovered using Netty leak detection and code review.
* `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of `LedgerEntry`, but did not release the underlying buffers. Fix: iterate though the `Enumeration` and release underlying buffer. Instead of logging the case where the `Enumeration` did not contain any elements, complete the future exceptionally with the message (will be logged by Caffeine).
* Two main sources of leak in `TwoPhaseCompactor`. The `RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf` (uncompressedPayload). Also, the return ByteBuf of `RawBacthConverter.rebatchMessage` was not closed. The first one was easy to fix (release buffer), to fix the second one and make the code easier to read, I decided to not let `RawBacthConverter.rebatchMessage` close the message read from the topic, instead the message read from the topic can be closed [...]
### Verifying this change
Modified `RawReaderTest.testBatchingRebatch` to show new contract.
One can run the test described to reproduce the issue, to verify no leak is detected.
---
.../pulsar/client/impl/RawBatchConverter.java | 5 +-
.../pulsar/compaction/CompactedTopicImpl.java | 19 ++--
.../pulsar/compaction/TwoPhaseCompactor.java | 119 +++++++++++----------
.../apache/pulsar/client/impl/RawReaderTest.java | 4 +-
4 files changed, 82 insertions(+), 65 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index e252426..8c21a73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -91,8 +91,7 @@ public class RawBatchConverter {
* Take a batched message and a filter, and returns a message with the only the sub-messages
* which match the filter. Returns an empty optional if no messages match.
*
- * This takes ownership of the passes in message, and if the returned optional is not empty,
- * the ownership of that message is returned also.
+ * NOTE: this message does not alter the reference count of the RawMessage argument.
*/
public static Optional<RawMessage> rebatchMessage(RawMessage msg,
BiPredicate<String, MessageId> filter)
@@ -161,9 +160,9 @@ public class RawBatchConverter {
return Optional.empty();
}
} finally {
+ uncompressedPayload.release();
batchBuffer.release();
metadata.recycle();
- msg.close();
}
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index b1378b6..22efe8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -164,12 +164,19 @@ public class CompactedTopicImpl implements CompactedTopic {
if (rc != BKException.Code.OK) {
promise.completeExceptionally(BKException.create(rc));
} else {
- try (RawMessage m = RawMessageImpl.deserializeFrom(
- seq.nextElement().getEntryBuffer())) {
- promise.complete(m.getMessageIdData());
- } catch (NoSuchElementException e) {
- log.error("No such entry {} in ledger {}", entryId, lh.getId());
- promise.completeExceptionally(e);
+ // Need to release buffers for all entries in the sequence
+ if (seq.hasMoreElements()) {
+ LedgerEntry entry = seq.nextElement();
+ try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
+ entry.getEntryBuffer().release();
+ while (seq.hasMoreElements()) {
+ seq.nextElement().getEntryBuffer().release();
+ }
+ promise.complete(m.getMessageIdData());
+ }
+ } else {
+ promise.completeExceptionally(new NoSuchElementException(
+ String.format("No such entry %d in ledger %d", entryId, lh.getId())));
}
}
}, null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 95f6f1a..a275bb5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -212,77 +212,88 @@ public class TwoPhaseCompactor extends Compactor {
private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
+ if (promise.isDone()) {
+ return;
+ }
reader.readNextAsync().whenCompleteAsync(
(m, exception) -> {
if (exception != null) {
promise.completeExceptionally(exception);
return;
} else if (promise.isDone()) {
+ m.close();
return;
}
- MessageId id = m.getMessageId();
- Optional<RawMessage> messageToAdd = Optional.empty();
- if (RawBatchConverter.isReadableBatch(m)) {
- try {
- messageToAdd = RawBatchConverter.rebatchMessage(
- m, (key, subid) -> latestForKey.get(key).equals(subid));
- } catch (IOException ioe) {
- log.info("Error decoding batch for message {}. Whole batch will be included in output",
- id, ioe);
- messageToAdd = Optional.of(m);
- }
- } else {
- Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
- MessageId msg;
- if (keyAndSize == null) { // pass through messages without a key
- messageToAdd = Optional.of(m);
- } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
- && msg.equals(id)) { // consider message only if present into latestForKey map
- if (keyAndSize.getRight() <= 0) {
- promise.completeExceptionally(new IllegalArgumentException(
- "Compaction phase found empty record from sorted key-map"));
+ try {
+ MessageId id = m.getMessageId();
+ Optional<RawMessage> messageToAdd = Optional.empty();
+ if (RawBatchConverter.isReadableBatch(m)) {
+ try {
+ messageToAdd = RawBatchConverter.rebatchMessage(
+ m, (key, subid) -> latestForKey.get(key).equals(subid));
+ } catch (IOException ioe) {
+ log.info("Error decoding batch for message {}. Whole batch will be included in output",
+ id, ioe);
+ messageToAdd = Optional.of(m);
}
- messageToAdd = Optional.of(m);
} else {
- m.close();
+ Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+ MessageId msg;
+ if (keyAndSize == null) { // pass through messages without a key
+ messageToAdd = Optional.of(m);
+ } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
+ && msg.equals(id)) { // consider message only if present into latestForKey map
+ if (keyAndSize.getRight() <= 0) {
+ promise.completeExceptionally(new IllegalArgumentException(
+ "Compaction phase found empty record from sorted key-map"));
+ }
+ messageToAdd = Optional.of(m);
+ }
}
- }
- if (messageToAdd.isPresent()) {
- try {
- outstanding.acquire();
- CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
- .whenComplete((res, exception2) -> {
- outstanding.release();
- if (exception2 != null) {
- promise.completeExceptionally(exception2);
+ if (messageToAdd.isPresent()) {
+ RawMessage message = messageToAdd.get();
+ try {
+ outstanding.acquire();
+ CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
+ .whenComplete((res, exception2) -> {
+ outstanding.release();
+ if (exception2 != null) {
+ promise.completeExceptionally(exception2);
+ }
+ });
+ if (to.equals(id)) {
+ addFuture.whenComplete((res, exception2) -> {
+ if (exception2 == null) {
+ promise.complete(null);
}
});
- if (to.equals(id)) {
- addFuture.whenComplete((res, exception2) -> {
- if (exception2 == null) {
- promise.complete(null);
- }
- });
+ }
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(ie);
+ } finally {
+ if (message != m) {
+ message.close();
+ }
}
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(ie);
- }
- } else if (to.equals(id)) {
- // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
- // present under latestForKey. Complete the compaction.
- try {
- // make sure all inflight writes have finished
- outstanding.acquire(MAX_OUTSTANDING);
- promise.complete(null);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- promise.completeExceptionally(e);
+ } else if (to.equals(id)) {
+ // Reached to last-id and phase-one found it deleted-message while iterating on ledger so,
+ // not present under latestForKey. Complete the compaction.
+ try {
+ // make sure all inflight writes have finished
+ outstanding.acquire(MAX_OUTSTANDING);
+ promise.complete(null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ promise.completeExceptionally(e);
+ }
+ return;
}
- return;
+ phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
+ } finally {
+ m.close();
}
- phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
}, scheduler);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index b0c7cd1..5ae4618 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -319,13 +319,13 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
}
RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
- try {
- RawMessage m1 = reader.readNextAsync().get();
+ try (RawMessage m1 = reader.readNextAsync().get()) {
RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
Assert.assertEquals(idsAndKeys.size(), 1);
Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
m2.close();
+ Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
} finally {
reader.closeAsync().get();
}