You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/20 08:56:25 UTC

[pulsar] branch branch-2.11 updated (ec2aef7f52d -> 0333becfbfe)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from ec2aef7f52d [fix][metadata] Cleanup state when lock revalidation gets `LockBusyException` (#17700)
     new ee30b137d0a [fix][client] Unwrap completion exception for Lookup Services (#17717)
     new 0333becfbfe [fix][schema]ledger handle leak when update schema (#17283)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../service/schema/BookkeeperSchemaStorage.java    | 13 ++++----
 .../SchemaCompatibilityCheckTest.java              | 35 ++++++++++++++++++++++
 .../client/impl/BinaryProtoLookupService.java      | 33 ++++++++++----------
 .../pulsar/client/impl/HttpLookupService.java      | 12 ++++----
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |  4 +++
 5 files changed, 72 insertions(+), 25 deletions(-)


[pulsar] 02/02: [fix][schema]ledger handle leak when update schema (#17283)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0333becfbfebc1ae4a435d56deefe8dc22a68634
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Tue Sep 20 11:21:54 2022 +0800

    [fix][schema]ledger handle leak when update schema (#17283)
    
    ### Motivation
    
    in the schema update, will create a `ledgerHandle` and write data to BK, after that `ledgerHandle` is no longer useful and no other object holds references to it. `ledgerHandle` will be recycled with GC, but `ledgerHandle` also hold external connections, which will cause leakage.
    
    https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L452-L456
    
    ### Modifications
    
    after the schema is updated, close the `ledgerHandle`, just like schema-read:
    
    https://github.com/apache/pulsar/blob/40b9d7ea50cef54becb09f2543193e08375abe0b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L519-L525
---
 .../service/schema/BookkeeperSchemaStorage.java    | 13 ++++----
 .../SchemaCompatibilityCheckTest.java              | 35 ++++++++++++++++++++++
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |  4 +++
 3 files changed, 47 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 88a94198f4f..4451cab7c92 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -449,11 +449,14 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
         byte[] data
     ) {
         SchemaStorageFormat.SchemaEntry schemaEntry = newSchemaEntry(index, data);
-        return createLedger(schemaId).thenCompose(ledgerHandle ->
-            addEntry(ledgerHandle, schemaEntry).thenApply(entryId ->
-                Functions.newPositionInfo(ledgerHandle.getId(), entryId)
-            )
-        );
+        return createLedger(schemaId).thenCompose(ledgerHandle -> {
+            final long ledgerId = ledgerHandle.getId();
+            return addEntry(ledgerHandle, schemaEntry)
+                    .thenApply(entryId -> {
+                        ledgerHandle.closeAsync();
+                        return Functions.newPositionInfo(ledgerId, entryId);
+                    });
+        });
     }
 
     @NotNull
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 1b5e4d67232..1167eff2ab0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.util.Collections;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Consumer;
@@ -478,7 +479,41 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
 
         consumerOne.close();
         producerOne.close();
+    }
 
+    @Test
+    public void testSchemaLedgerAutoRelease() throws Exception {
+        String namespaceName = PUBLIC_TENANT + "/default";
+        String topicName = "persistent://" + namespaceName + "/tp";
+        admin.namespaces().createNamespace(namespaceName, Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        // Update schema 100 times.
+        for (int i = 0; i < 100; i++){
+            Schema schema = Schema.JSON(SchemaDefinition.builder()
+                    .withJsonDef(String.format("""
+                            {
+                            	"type": "record",
+                            	"name": "Test_Pojo",
+                            	"namespace": "org.apache.pulsar.schema.compatibility",
+                            	"fields": [{
+                            		"name": "prop_%s",
+                            		"type": ["null", "string"],
+                            		"default": null
+                            	}]
+                            }
+                            """, i))
+                    .build());
+            Producer producer = pulsarClient
+                    .newProducer(schema)
+                    .topic(topicName)
+                    .create();
+            producer.close();
+        }
+        // The other ledgers are about 5.
+        Assert.assertTrue(mockBookKeeper.getLedgerMap().values().stream()
+                .filter(ledger -> !ledger.isFenced())
+                .collect(Collectors.toList()).size() < 20);
+        admin.topics().delete(topicName, true);
     }
 
     @Test
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index 7378a6f9106..7842959ee25 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -29,6 +30,7 @@ import java.util.Enumeration;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
@@ -58,6 +60,8 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
     final byte[] passwd;
     final ReadHandle readHandle;
     long lastEntry = -1;
+    @VisibleForTesting
+    @Getter
     boolean fenced = false;
 
     public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,


[pulsar] 01/02: [fix][client] Unwrap completion exception for Lookup Services (#17717)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ee30b137d0a8ec921cbab17c651e53cfb941eb5f
Author: Penghui Li <pe...@apache.org>
AuthorDate: Tue Sep 20 15:54:31 2022 +0800

    [fix][client] Unwrap completion exception for Lookup Services (#17717)
---
 .../client/impl/BinaryProtoLookupService.java      | 33 ++++++++++++----------
 .../pulsar/client/impl/HttpLookupService.java      | 12 ++++----
 2 files changed, 25 insertions(+), 20 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 18bffba8dc6..e0b85d7bb34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,19 +142,21 @@ public class BinaryProtoLookupService implements LookupService {
                         // (2) redirect to given address if response is: redirect
                         if (r.redirect) {
                             findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1)
-                                .thenAccept(addressFuture::complete).exceptionally((lookupException) -> {
-                                // lookup failed
-                                if (redirectCount > 0) {
-                                    if (log.isDebugEnabled()) {
-                                        log.debug("[{}] lookup redirection failed ({}) : {}", topicName,
-                                                redirectCount, lookupException.getMessage());
+                                .thenAccept(addressFuture::complete)
+                                .exceptionally((lookupException) -> {
+                                    Throwable cause = FutureUtil.unwrapCompletionException(lookupException);
+                                    // lookup failed
+                                    if (redirectCount > 0) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] lookup redirection failed ({}) : {}", topicName,
+                                                    redirectCount, cause.getMessage());
+                                        }
+                                    } else {
+                                        log.warn("[{}] lookup failed : {}", topicName,
+                                                cause.getMessage(), cause);
                                     }
-                                } else {
-                                    log.warn("[{}] lookup failed : {}", topicName,
-                                            lookupException.getMessage(), lookupException);
-                                }
-                                addressFuture.completeExceptionally(lookupException);
-                                return null;
+                                    addressFuture.completeExceptionally(cause);
+                                    return null;
                             });
                         } else {
                             // (3) received correct broker to connect
@@ -176,7 +179,7 @@ public class BinaryProtoLookupService implements LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            addressFuture.completeExceptionally(connectionException);
+            addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
         return addressFuture;
@@ -209,7 +212,7 @@ public class BinaryProtoLookupService implements LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(connectionException -> {
-            partitionFuture.completeExceptionally(connectionException);
+            partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException));
             return null;
         });
 
@@ -245,7 +248,7 @@ public class BinaryProtoLookupService implements LookupService {
                 client.getCnxPool().releaseConnection(clientCnx);
             });
         }).exceptionally(ex -> {
-            schemaFuture.completeExceptionally(ex);
+            schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex));
             return null;
         });
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index def19c45aff..d42bde828bc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -148,8 +148,9 @@ public class HttpLookupService implements LookupService {
                 });
                 future.complete(new GetTopicsResult(result, topicsHash, false, true));
             }).exceptionally(ex -> {
-                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, ex.getMessage());
-                future.completeExceptionally(ex);
+                Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                log.warn("Failed to getTopicsUnderNamespace namespace {} {}.", namespace, cause.getMessage());
+                future.completeExceptionally(cause);
                 return null;
             });
         return future;
@@ -193,14 +194,15 @@ public class HttpLookupService implements LookupService {
                 future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response)));
             }
         }).exceptionally(ex -> {
-            if (ex.getCause() instanceof NotFoundException) {
+            Throwable cause = FutureUtil.unwrapCompletionException(ex);
+            if (cause instanceof NotFoundException) {
                 future.complete(Optional.empty());
             } else {
                 log.warn("Failed to get schema for topic {} version {}",
                         topicName,
                         version != null ? Base64.getEncoder().encodeToString(version) : null,
-                        ex.getCause());
-                future.completeExceptionally(ex);
+                        cause);
+                future.completeExceptionally(cause);
             }
             return null;
         });