You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/20 00:49:44 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #17283: [fix][schema]ledger handle leak when update schema

poorbarcode opened a new pull request, #17283:
URL: https://github.com/apache/pulsar/pull/17283

   ### Motivation
   
   in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage.
   
   https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456
   
   ### Modifications
   
   after the schema is updated, close the `ledgerHandle`, just like schema-read:
   
   https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525
   
   ### Documentation
   
   
   - [ ] `doc-required` 
   
     
   - [x] `doc-not-needed` 
   
     
   - [ ] `doc` 
   
   
   - [ ] `doc-complete`
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   
   - https://github.com/poorbarcode/pulsar/pull/6
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17283:
URL: https://github.com/apache/pulsar/pull/17283#discussion_r974131093


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -437,11 +437,12 @@ private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToS
         byte[] data
     ) {
         SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
-        return createLedger(schemaId).thenCompose(ledgerHandle ->
-            addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
-                Functions.newPositionInfo(ledgerHandle.getId(), entryId)
-            )
-        );
+        return createLedger(schemaId).thenCompose(ledgerHandle -> {
+            final long ledgerId = ledgerHandle.getId();
+            return addEntry(ledgerHandle, schemaEntry)
+                    .thenCompose(entryId -> ledgerHandle.closeAsync().thenApply(__ -> entryId))

Review Comment:
   I think we only need to trigger the close? The client-side doesn't need to wait for the close operation done.



##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java:
##########
@@ -478,7 +479,52 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS
 
         consumerOne.close();
         producerOne.close();
+    }
+
+    @Test
+    public void testSchemaLedgerAutoRelease() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/default";
+        String topicName = "persistent://" + namespaceName + "/tp";
+        admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        // Update schema 100 times.
+        for (int i = 0; i < 100; i++){
+            Schema schema = Schema.JSON(SchemaDefinition.builder()
+                    .withJsonDef(String.format("""
+                            {
+                            	"type": "record",
+                            	"name": "Test_Pojo",
+                            	"namespace": "org.apache.pulsar.schema.compatibility",
+                            	"fields": [{
+                            		"name": "prop_%s",
+                            		"type": ["null", "string"],
+                            		"default": null
+                            	}]
+                            }
+                            """, i))
+                    .build());
+            Producer producer = pulsarClient
+                    .newProducer(schema)
+                    .topic(topicName)
+                    .create();
+            producer.close();
+        }
+        // The other ledgers are about 5.
+        Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
+                .filter(ledger -> !ledger.isFenced())
+                .collect(Collectors.toList()).size() < 20);
+        admin.topics().delete(topicName, true);
+    }
+
+    private static class DynamicClassLoader extends ClassLoader{

Review Comment:
   We don't need this class anymore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17283:
URL: https://github.com/apache/pulsar/pull/17283#discussion_r974393870


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java:
##########
@@ -437,11 +437,12 @@ private CompletableFuture<SchemaStorageFormat.PositionInfo> addNewSchemaEntryToS
         byte[] data
     ) {
         SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
-        return createLedger(schemaId).thenCompose(ledgerHandle ->
-            addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
-                Functions.newPositionInfo(ledgerHandle.getId(), entryId)
-            )
-        );
+        return createLedger(schemaId).thenCompose(ledgerHandle -> {
+            final long ledgerId = ledgerHandle.getId();
+            return addEntry(ledgerHandle, schemaEntry)
+                    .thenCompose(entryId -> ledgerHandle.closeAsync().thenApply(__ -> entryId))

Review Comment:
   Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui closed pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
codelipenghui closed pull request #17283: [fix][schema]ledger handle leak when update schema
URL: https://github.com/apache/pulsar/pull/17283


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17283:
URL: https://github.com/apache/pulsar/pull/17283#discussion_r974394095


##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java:
##########
@@ -478,7 +479,52 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS
 
         consumerOne.close();
         producerOne.close();
+    }
+
+    @Test
+    public void testSchemaLedgerAutoRelease() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/default";
+        String topicName = "persistent://" + namespaceName + "/tp";
+        admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        // Update schema 100 times.
+        for (int i = 0; i < 100; i++){
+            Schema schema = Schema.JSON(SchemaDefinition.builder()
+                    .withJsonDef(String.format("""
+                            {
+                            	"type": "record",
+                            	"name": "Test_Pojo",
+                            	"namespace": "org.apache.pulsar.schema.compatibility",
+                            	"fields": [{
+                            		"name": "prop_%s",
+                            		"type": ["null", "string"],
+                            		"default": null
+                            	}]
+                            }
+                            """, i))
+                    .build());
+            Producer producer = pulsarClient
+                    .newProducer(schema)
+                    .topic(topicName)
+                    .create();
+            producer.close();
+        }
+        // The other ledgers are about 5.
+        Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
+                .filter(ledger -> !ledger.isFenced())
+                .collect(Collectors.toList()).size() < 20);
+        admin.topics().delete(topicName, true);
+    }
+
+    private static class DynamicClassLoader extends ClassLoader{

Review Comment:
   Already remove this internal class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17283:
URL: https://github.com/apache/pulsar/pull/17283#discussion_r971171379


##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java:
##########
@@ -478,7 +483,56 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS
 
         consumerOne.close();
         producerOne.close();
+    }
 
+    @Test
+    public void testSchemaLedgerAutoRelease() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/default";
+        String topicName = "persistent://" + namespaceName + "/tp";
+        admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        // Update schema 100 times.
+        ArrayList<Class> classes = createManyClass(classLoader, 100);

Review Comment:
   Already use `SchemaDefinitionBuilder` instead dynamic generate class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17283:
URL: https://github.com/apache/pulsar/pull/17283#issuecomment-1251696751

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17283:
URL: https://github.com/apache/pulsar/pull/17283#discussion_r970925823


##########
pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java:
##########
@@ -478,7 +483,56 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS
 
         consumerOne.close();
         producerOne.close();
+    }
 
+    @Test
+    public void testSchemaLedgerAutoRelease() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/default";
+        String topicName = "persistent://" + namespaceName + "/tp";
+        admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        // Update schema 100 times.
+        ArrayList<Class> classes = createManyClass(classLoader, 100);

Review Comment:
   I think using the SchemaDefinitionBuilder is easier to create multiple schemas.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 merged pull request #17283: [fix][schema]ledger handle leak when update schema

Posted by GitBox <gi...@apache.org>.
congbobo184 merged PR #17283:
URL: https://github.com/apache/pulsar/pull/17283


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org