You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/06 06:32:36 UTC

[pulsar] branch master updated: Fix memory leak when running topic compaction. (#6485)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f2ec1b4  Fix memory leak when running topic compaction. (#6485)
f2ec1b4 is described below

commit f2ec1b4e2836859b0a6beb9b5a12656e4bcaf8f9
Author: Rolf Arne Corneliussen <ra...@users.noreply.github.com>
AuthorDate: Fri Mar 6 07:32:28 2020 +0100

    Fix memory leak when running topic compaction. (#6485)
    
    
    Fixes #6482
    
    ### Motivation
    Prevent topic compaction from leaking direct memory
    
    ### Modifications
    
    Several leaks were discovered using Netty leak detection and code review.
    * `CompactedTopicImpl.readOneMessageId` would get an `Enumeration` of `LedgerEntry`, but did not release the underlying buffers. Fix: iterate though the `Enumeration` and release underlying buffer. Instead of logging the case where the `Enumeration` did not contain any elements, complete the future exceptionally with the message (will be logged by Caffeine).
    * Two main sources of leak in `TwoPhaseCompactor`. The `RawBacthConverter.rebatchMessage` method failed to close/release a `ByteBuf` (uncompressedPayload). Also, the return ByteBuf of `RawBacthConverter.rebatchMessage` was not closed. The first one was easy to fix (release buffer), to fix the second one and make the code easier to read, I decided to not let `RawBacthConverter.rebatchMessage`  close the message read from the topic, instead the message read from the topic can be closed  [...]
    
    ### Verifying this change
    Modified `RawReaderTest.testBatchingRebatch` to show new contract.
    
    One can run the test described to reproduce the issue, to verify no leak is detected.
---
 .../pulsar/client/impl/RawBatchConverter.java      |   5 +-
 .../pulsar/compaction/CompactedTopicImpl.java      |  19 ++--
 .../pulsar/compaction/TwoPhaseCompactor.java       | 119 +++++++++++----------
 .../apache/pulsar/client/impl/RawReaderTest.java   |   4 +-
 4 files changed, 82 insertions(+), 65 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index e252426..8c21a73 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -91,8 +91,7 @@ public class RawBatchConverter {
      * Take a batched message and a filter, and returns a message with the only the sub-messages
      * which match the filter. Returns an empty optional if no messages match.
      *
-     * This takes ownership of the passes in message, and if the returned optional is not empty,
-     * the ownership of that message is returned also.
+     *  NOTE: this message does not alter the reference count of the RawMessage argument.
      */
     public static Optional<RawMessage> rebatchMessage(RawMessage msg,
                                                       BiPredicate<String, MessageId> filter)
@@ -161,9 +160,9 @@ public class RawBatchConverter {
                 return Optional.empty();
             }
         } finally {
+            uncompressedPayload.release();
             batchBuffer.release();
             metadata.recycle();
-            msg.close();
         }
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index b1378b6..22efe8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -164,12 +164,19 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 if (rc != BKException.Code.OK) {
                                     promise.completeExceptionally(BKException.create(rc));
                                 } else {
-                                    try (RawMessage m = RawMessageImpl.deserializeFrom(
-                                                 seq.nextElement().getEntryBuffer())) {
-                                        promise.complete(m.getMessageIdData());
-                                    } catch (NoSuchElementException e) {
-                                        log.error("No such entry {} in ledger {}", entryId, lh.getId());
-                                        promise.completeExceptionally(e);
+                                    // Need to release buffers for all entries in the sequence
+                                    if (seq.hasMoreElements()) {
+                                        LedgerEntry entry = seq.nextElement();
+                                        try (RawMessage m = RawMessageImpl.deserializeFrom(entry.getEntryBuffer())) {
+                                            entry.getEntryBuffer().release();
+                                            while (seq.hasMoreElements()) {
+                                                seq.nextElement().getEntryBuffer().release();
+                                            }
+                                            promise.complete(m.getMessageIdData());
+                                        }
+                                    } else {
+                                        promise.completeExceptionally(new NoSuchElementException(
+                                                String.format("No such entry %d in ledger %d", entryId, lh.getId())));
                                     }
                                 }
                             }, null);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 95f6f1a..a275bb5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -212,77 +212,88 @@ public class TwoPhaseCompactor extends Compactor {
 
     private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
                               LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
+        if (promise.isDone()) {
+            return;
+        }
         reader.readNextAsync().whenCompleteAsync(
                 (m, exception) -> {
                     if (exception != null) {
                         promise.completeExceptionally(exception);
                         return;
                     } else if (promise.isDone()) {
+                        m.close();
                         return;
                     }
-                    MessageId id = m.getMessageId();
-                    Optional<RawMessage> messageToAdd = Optional.empty();
-                    if (RawBatchConverter.isReadableBatch(m)) {
-                        try {
-                            messageToAdd = RawBatchConverter.rebatchMessage(
-                                    m, (key, subid) -> latestForKey.get(key).equals(subid));
-                        } catch (IOException ioe) {
-                            log.info("Error decoding batch for message {}. Whole batch will be included in output",
-                                     id, ioe);
-                            messageToAdd = Optional.of(m);
-                        }
-                    } else {
-                        Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
-                        MessageId msg;
-                        if (keyAndSize == null) { // pass through messages without a key
-                            messageToAdd = Optional.of(m);
-                        } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
-                                && msg.equals(id)) { // consider message only if present into latestForKey map
-                            if (keyAndSize.getRight() <= 0) {
-                                promise.completeExceptionally(new IllegalArgumentException(
-                                        "Compaction phase found empty record from sorted key-map"));
+                    try {
+                        MessageId id = m.getMessageId();
+                        Optional<RawMessage> messageToAdd = Optional.empty();
+                        if (RawBatchConverter.isReadableBatch(m)) {
+                            try {
+                                messageToAdd = RawBatchConverter.rebatchMessage(
+                                        m, (key, subid) -> latestForKey.get(key).equals(subid));
+                            } catch (IOException ioe) {
+                                log.info("Error decoding batch for message {}. Whole batch will be included in output",
+                                        id, ioe);
+                                messageToAdd = Optional.of(m);
                             }
-                            messageToAdd = Optional.of(m);
                         } else {
-                            m.close();
+                            Pair<String,Integer> keyAndSize = extractKeyAndSize(m);
+                            MessageId msg;
+                            if (keyAndSize == null) { // pass through messages without a key
+                                messageToAdd = Optional.of(m);
+                            } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null
+                                    && msg.equals(id)) { // consider message only if present into latestForKey map
+                                if (keyAndSize.getRight() <= 0) {
+                                    promise.completeExceptionally(new IllegalArgumentException(
+                                            "Compaction phase found empty record from sorted key-map"));
+                                }
+                                messageToAdd = Optional.of(m);
+                            }
                         }
-                    }
 
-                    if (messageToAdd.isPresent()) {
-                        try {
-                            outstanding.acquire();
-                            CompletableFuture<Void> addFuture = addToCompactedLedger(lh, messageToAdd.get())
-                                    .whenComplete((res, exception2) -> {
-                                        outstanding.release();
-                                        if (exception2 != null) {
-                                            promise.completeExceptionally(exception2);
+                        if (messageToAdd.isPresent()) {
+                            RawMessage message = messageToAdd.get();
+                            try {
+                                outstanding.acquire();
+                                CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
+                                        .whenComplete((res, exception2) -> {
+                                            outstanding.release();
+                                            if (exception2 != null) {
+                                                promise.completeExceptionally(exception2);
+                                            }
+                                        });
+                                if (to.equals(id)) {
+                                    addFuture.whenComplete((res, exception2) -> {
+                                        if (exception2 == null) {
+                                            promise.complete(null);
                                         }
                                     });
-                            if (to.equals(id)) {
-                                addFuture.whenComplete((res, exception2) -> {
-                                    if (exception2 == null) {
-                                        promise.complete(null);
-                                    }
-                                });
+                                }
+                            } catch (InterruptedException ie) {
+                                Thread.currentThread().interrupt();
+                                promise.completeExceptionally(ie);
+                            } finally {
+                                if (message != m) {
+                                    message.close();
+                                }
                             }
-                        } catch (InterruptedException ie) {
-                            Thread.currentThread().interrupt();
-                            promise.completeExceptionally(ie);
-                        }
-                    } else if (to.equals(id)) {
-                        // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, not
-                        // present under latestForKey. Complete the compaction.
-                        try {
-                            // make sure all inflight writes have finished
-                            outstanding.acquire(MAX_OUTSTANDING);
-                            promise.complete(null);
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                            promise.completeExceptionally(e);
+                        } else if (to.equals(id)) {
+                            // Reached to last-id and phase-one found it deleted-message while iterating on ledger so,
+                            // not present under latestForKey. Complete the compaction.
+                            try {
+                                // make sure all inflight writes have finished
+                                outstanding.acquire(MAX_OUTSTANDING);
+                                promise.complete(null);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                promise.completeExceptionally(e);
+                            }
+                            return;
                         }
-                        return;
+                        phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
+                    } finally {
+                        m.close();
                     }
-                    phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
                 }, scheduler);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index b0c7cd1..5ae4618 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -319,13 +319,13 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         }
 
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
-        try {
-            RawMessage m1 = reader.readNextAsync().get();
+        try (RawMessage m1 = reader.readNextAsync().get()) {
             RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
             List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
             Assert.assertEquals(idsAndKeys.size(), 1);
             Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
             m2.close();
+            Assert.assertEquals(m1.getHeadersAndPayload().refCnt(), 1);
         } finally {
             reader.closeAsync().get();
         }