You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/09/20 03:22:07 UTC
[pulsar] branch master updated: [fix][schema]ledger handle leak when update schema (#17283)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26204503494 [fix][schema]ledger handle leak when update schema (#17283)
26204503494 is described below
commit 26204503494871db3818b4d2f35071c6ee1b5b96
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Sep 20 11:21:54 2022 +0800
[fix][schema]ledger handle leak when update schema (#17283)
### Motivation
in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage.
https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456
### Modifications
after the schema is updated, close the `ledgerHandle`, just like schema-read:
https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525
---
.../service/schema/BookkeeperSchemaStorage.java | 13 ++++----
.../SchemaCompatibilityCheckTest.java | 35 ++++++++++++++++++++++
.../bookkeeper/client/PulsarMockLedgerHandle.java | 4 +++
3 files changed, 47 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 8043d3527de..b62335d9ab4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -437,11 +437,14 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
byte[] data
) {
SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
- return createLedger(schemaId).thenCompose(ledgerHandle ->
- addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
- Functions.newPositionInfo(ledgerHandle.getId(), entryId)
- )
- );
+ return createLedger(schemaId).thenCompose(ledgerHandle -> {
+ final long ledgerId = ledgerHandle.getId();
+ return addEntry(ledgerHandle, schemaEntry)
+ .thenApply(entryId -> {
+ ledgerHandle.closeAsync();
+ return Functions.newPositionInfo(ledgerId, entryId);
+ });
+ });
}
@NotNull
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 1b5e4d67232..1167eff2ab0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
@@ -478,7 +479,41 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
consumerOne.close();
producerOne.close();
+ }
+ @Test
+ public void testSchemaLedgerAutoRelease() throws Exception {
+ String namespaceName = PUBLIC_TENANT + "/default";
+ String topicName = "persistent://" + namespaceName + "/tp";
+ admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
+ admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+ // Update schema 100 times.
+ for (int i = 0; i < 100; i++){
+ Schema schema = Schema.JSON(SchemaDefinition.builder()
+ .withJsonDef(String.format("""
+ {
+ "type": "record",
+ "name": "Test_Pojo",
+ "namespace": "org.apache.pulsar.schema.compatibility",
+ "fields": [{
+ "name": "prop_%s",
+ "type": ["null", "string"],
+ "default": null
+ }]
+ }
+ """, i))
+ .build());
+ Producer producer = pulsarClient
+ .newProducer(schema)
+ .topic(topicName)
+ .create();
+ producer.close();
+ }
+ // The other ledgers are about 5.
+ Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
+ .filter(ledger -> !ledger.isFenced())
+ .collect(Collectors.toList()).size() < 20);
+ admin.topics().delete(topicName, true);
}
@Test
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 7378a6f9106..7842959ee25 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.client;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@ import java.util.Enumeration;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
@@ -58,6 +60,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
final byte[] passwd;
final ReadHandle readHandle;
long lastEntry = -1;
+ @VisibleForTesting
+ @Getter
boolean fenced = false;
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,