You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:55:57 UTC

[pulsar] branch branch-2.8 updated (50da9b2 -> 4cf7165)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 50da9b2  fix non-persistent topic get partitioned metadata error on discovery (#10806)
     new 9c3e904  fix parseMessageMetadata error cause by not skip broker entry metadata (#10968)
     new 1a2f820  [Security] Upgrade commons-codec to 1.15 (#10864)
     new be1eedc  [Security] Upgrade vertx to 3.9.8 to address CVE-2019-17640 (#10889)
     new fc27d0e  Change the nar package name for pulsar-io-kafka-connect-adaptor (#10976)
     new b7259cd  fix NoClassDefFoundError - io.airlift.compress.lz4.UnsafeUtil (#10983)
     new d7bff1e  Fix incorrect port of advertisedListener (#10961)
     new c019c2a  [Broker] Fix create partitioned topic in replicated namespace (#10963)
     new 72f3911  Fix potential data lost on the system topic when topic compaction have not triggered yet (#11003)
     new 83a29ec  [Transaction] Fix broker init transaction related topic. (#11022)
     new a5e643c  [Python] Fixed import when AvroSchema is not being used (#11034)
     new d23c5ed  Made the PulsarClusterMetadataTeardown deletes idempotent (#11042)
     new 68889ef  [C++] Fix Windows 32 bits compile and runtime failures (#11082)
     new 4cf7165  [Issue 11075] Use the subscription name defined in function details (#11076)

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-cpp-build-windows.yaml        |   7 +
 distribution/io/src/assemble/io.xml                |   2 +-
 distribution/server/src/assemble/LICENSE.bin.txt   |  13 +-
 pom.xml                                            |   6 +-
 .../pulsar/broker/ServiceConfigurationUtils.java   |   8 +-
 .../validator/MultipleListenerValidatorTest.java   |  15 ++-
 .../pulsar/PulsarClusterMetadataTeardown.java      |  27 ++--
 .../org/apache/pulsar/broker/PulsarService.java    |  26 +++-
 .../apache/pulsar/broker/admin/AdminResource.java  | 120 +++++++++++------
 .../broker/admin/impl/PersistentTopicsBase.java    |   2 -
 .../broker/admin/v1/NonPersistentTopics.java       |  15 ++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  15 ++-
 .../broker/admin/v2/NonPersistentTopics.java       |   6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   5 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   |   2 +-
 .../pulsar/broker/service/BrokerService.java       |  11 +-
 .../service/persistent/PersistentSubscription.java |   7 +-
 .../broker/service/persistent/PersistentTopic.java |  20 +--
 .../broker/service/persistent/SystemTopic.java     |  21 ++-
 .../apache/pulsar/compaction/CompactorTool.java    |   8 +-
 .../apache/pulsar/broker/PulsarServiceTest.java    |  19 ++-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  27 ++--
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  14 +-
 .../broker/service/BrokerEntryMetadataE2ETest.java |  20 +++
 .../pulsar/broker/service/ReplicatorTest.java      |  62 ++++++++-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  53 ++++----
 .../pulsar/broker/transaction/TransactionTest.java | 142 +++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   |   8 +-
 pulsar-client-cpp/README.md                        |   8 +-
 pulsar-client-cpp/lib/CMakeLists.txt               |   2 +-
 pulsar-client-cpp/lib/checksum/crc32c_sse42.cc     |  12 +-
 pulsar-client-cpp/python/pulsar/schema/__init__.py |   3 +-
 pulsar-client-cpp/python/pulsar/schema/schema.py   |  27 ----
 .../python/pulsar/schema/schema_avro.py            |  67 ++++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java |   2 -
 .../apache/pulsar/client/impl/MessageImplTest.java |  51 ++++++++
 .../apache/pulsar/common/protocol/Commands.java    |   5 +-
 .../instance/src/main/python/python_instance.py    |   9 +-
 pulsar-io/kafka-connect-adaptor-nar/pom.xml        |   3 +
 pulsar-sql/presto-distribution/LICENSE             |   6 +-
 .../cli/ClusterMetadataTearDownTest.java           |   3 +
 42 files changed, 675 insertions(+), 206 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
 create mode 100644 pulsar-client-cpp/python/pulsar/schema/schema_avro.py

[pulsar] 13/13: [Issue 11075] Use the subscription name defined in function details (#11076)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4cf71659b744cb779441fff1c91044c72e199b19
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jun 25 04:26:32 2021 +0300

    [Issue 11075] Use the subscription name defined in function details (#11076)
    
    Fixes #11075
    
    ### Motivation
    
    See #11075 , python functions don't use the given subscription name for the input topic consumer.
    
    ### Modifications
    
    Use `function_details.source.subscriptionName` as the subscription name if it's non-blank.
    
    (cherry picked from commit 05ca2b546b9f68ce9d4a8166b92b40239c5cbe0f)
---
 pulsar-functions/instance/src/main/python/python_instance.py | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index fecde7a..54e4a34 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -136,9 +136,12 @@ class PythonInstance(object):
     if self.instance_config.function_details.source.subscriptionType == Function_pb2.SubscriptionType.Value("FAILOVER"):
       mode = pulsar._pulsar.ConsumerType.Failover
 
-    subscription_name = str(self.instance_config.function_details.tenant) + "/" + \
-                        str(self.instance_config.function_details.namespace) + "/" + \
-                        str(self.instance_config.function_details.name)
+    subscription_name = self.instance_config.function_details.source.subscriptionName    
+
+    if not (subscription_name and subscription_name.strip()):
+      subscription_name = str(self.instance_config.function_details.tenant) + "/" + \
+                          str(self.instance_config.function_details.namespace) + "/" + \
+                          str(self.instance_config.function_details.name)
 
     properties = util.get_properties(util.getFullyQualifiedFunctionName(
                         self.instance_config.function_details.tenant,

[pulsar] 07/13: [Broker] Fix create partitioned topic in replicated namespace (#10963)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c019c2a113a555c25fd65a6a450649ba25d8a1b4
Author: ran <ga...@126.com>
AuthorDate: Tue Jun 22 07:31:19 2021 +0800

    [Broker] Fix create partitioned topic in replicated namespace (#10963)
    
    Fixes https://github.com/apache/pulsar/issues/10673 Bug-2
    
    ### Motivation
    
    Currently, create a partitioned topic in the replicated namespace will not create metadata path `/managed-ledgers` on replicated clusters.
    
    ### Modifications
    
    Add a new flag `createLocalTopicOnly` to indicate whether create the partitioned path in replicated clusters or not.
    If the flag is false, make remote calls to create partitioned topics on replicated clusters.
    
    
    (cherry picked from commit 2f8c175f52a1731f794c741f8fc3347b680191eb)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 120 ++++++++++++++-------
 .../broker/admin/v1/NonPersistentTopics.java       |  15 +--
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  15 +--
 .../broker/admin/v2/NonPersistentTopics.java       |   6 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   5 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  27 ++---
 .../pulsar/broker/service/ReplicatorTest.java      |  62 ++++++++++-
 .../pulsar/client/admin/internal/TopicsImpl.java   |   8 +-
 9 files changed, 188 insertions(+), 72 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 8806497..530b350 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -619,7 +620,8 @@ public abstract class AdminResource extends PulsarWebResource {
         return topicPartitions;
     }
 
-    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
+    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions,
+                                                  boolean createLocalTopicOnly) {
         Integer maxTopicsPerNamespace = null;
 
         try {
@@ -672,55 +674,57 @@ public abstract class AdminResource extends PulsarWebResource {
                     "Number of partitions should be less than or equal to " + maxPartitions));
             return;
         }
+
+        List<CompletableFuture<Void>> createFutureList = new ArrayList<>();
+
+        CompletableFuture<Void> createLocalFuture = new CompletableFuture<>();
+        createFutureList.add(createLocalFuture);
         checkTopicExistsAsync(topicName).thenAccept(exists -> {
             if (exists) {
                 log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
                 asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
-            } else {
-
-                try {
-                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
-                    namespaceResources().getPartitionedTopicResources()
-                            .createAsync(path, new PartitionedTopicMetadata(numPartitions)).thenAccept(r -> {
-                                log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-                                tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
-                                    log.info("[{}] Successfully created partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    asyncResponse.resume(Response.noContent().build());
-                                }).exceptionally(e -> {
-                                    log.error("[{}] Failed to create partitions for topic {}", clientAppId(),
-                                            topicName);
-                                    // The partitioned topic is created but there are some partitions create failed
-                                    asyncResponse.resume(new RestException(e));
-                                    return null;
-                                });
-                            }).exceptionally(ex -> {
-                                if (ex.getCause() instanceof AlreadyExistsException) {
-                                    log.warn("[{}] Failed to create already existing partitioned topic {}",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(
-                                            new RestException(Status.CONFLICT, "Partitioned topic already exists"));
-                                } else if (ex.getCause() instanceof BadVersionException) {
-                                    log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
-                                            clientAppId(), topicName);
-                                    asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
-                                } else {
-                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName,
-                                            ex.getCause());
-                                    asyncResponse.resume(new RestException(ex.getCause()));
-                                }
-                                return null;
-                            });
-                } catch (Exception e) {
-                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-                    resumeAsyncResponseExceptionally(asyncResponse, e);
-                }
+                return;
             }
+
+            provisionPartitionedTopicPath(asyncResponse, numPartitions, createLocalTopicOnly)
+                    .thenCompose(ignored -> tryCreatePartitionsAsync(numPartitions))
+                    .whenComplete((ignored, ex) -> {
+                        if (ex != null) {
+                            createLocalFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        createLocalFuture.complete(null);
+                    });
         }).exceptionally(ex -> {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
             resumeAsyncResponseExceptionally(asyncResponse, ex);
             return null;
         });
+
+        if (!createLocalTopicOnly && topicName.isGlobal() && isNamespaceReplicated(namespaceName)) {
+            getNamespaceReplicatedClusters(namespaceName)
+                    .stream()
+                    .filter(cluster -> !cluster.equals(pulsar().getConfiguration().getClusterName()))
+                    .forEach(cluster -> createFutureList.add(
+                            ((TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster).topics())
+                                    .createPartitionedTopicAsync(
+                                            topicName.getPartitionedTopicName(), numPartitions, true)));
+        }
+
+        FutureUtil.waitForAll(createFutureList).whenComplete((ignored, ex) -> {
+            if (ex != null) {
+                log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName, ex.getCause());
+                if (ex.getCause() instanceof RestException) {
+                    asyncResponse.resume(ex.getCause());
+                } else {
+                    resumeAsyncResponseExceptionally(asyncResponse, ex.getCause());
+                }
+                return;
+            }
+            log.info("[{}] Successfully created partitions for topic {} in cluster {}",
+                    clientAppId(), topicName, pulsar().getConfiguration().getClusterName());
+            asyncResponse.resume(Response.noContent().build());
+        });
     }
 
     /**
@@ -747,6 +751,42 @@ public abstract class AdminResource extends PulsarWebResource {
                 });
     }
 
+    private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyncResponse,
+                                                                  int numPartitions,
+                                                                  boolean createLocalTopicOnly) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
+        namespaceResources()
+                .getPartitionedTopicResources()
+                .createAsync(partitionedTopicPath, new PartitionedTopicMetadata(numPartitions))
+                .whenComplete((ignored, ex) -> {
+                    if (ex != null) {
+                        if (ex instanceof AlreadyExistsException) {
+                            if (createLocalTopicOnly) {
+                                future.complete(null);
+                                return;
+                            }
+                            log.warn("[{}] Failed to create already existing partitioned topic {}",
+                                    clientAppId(), topicName);
+                            future.completeExceptionally(
+                                    new RestException(Status.CONFLICT, "Partitioned topic already exists"));
+                        } else if (ex instanceof BadVersionException) {
+                            log.warn("[{}] Failed to create partitioned topic {}: concurrent modification",
+                                    clientAppId(), topicName);
+                            future.completeExceptionally(
+                                    new RestException(Status.CONFLICT, "Concurrent modification"));
+                        } else {
+                            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
+                            future.completeExceptionally(new RestException(ex.getCause()));
+                        }
+                        return;
+                    }
+                    log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+                    future.complete(null);
+                });
+        return future;
+    }
+
     protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
         if (throwable instanceof WebApplicationException) {
             asyncResponse.resume((WebApplicationException) throwable);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 20fd24d..daf6ea0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -142,14 +142,17 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal"
                     + " to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist")})
-    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
-                                       @PathParam("property") String property, @PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
-                                               String encodedTopic,
-                                       int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 46708dd..babff0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -154,14 +154,17 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 406, message = "The number of partitions should be "
                     + "more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist")})
-    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse,
-                                       @PathParam("property") String property, @PathParam("cluster") String cluster,
-                                       @PathParam("namespace") String namespace, @PathParam("topic") @Encoded
-                                                   String encodedTopic,
-                                       int numPartitions) {
+    public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index cd6b31c..9d29967 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -188,12 +188,12 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
-                    int numPartitions) {
-
+                    int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validateTopicName(tenant, namespace, encodedTopic);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9772757..56e5ce1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -231,12 +231,13 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic",
                     required = true, type = "int", defaultValue = "0")
-                    int numPartitions) {
+                    int numPartitions,
+            @QueryParam("createLocalTopicOnly") @DefaultValue("false") boolean createLocalTopicOnly) {
         try {
             validateGlobalNamespaceOwnership(tenant, namespace);
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
             validateTopicPolicyOperation(topicName, PolicyName.PARTITION, PolicyOperation.WRITE);
-            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions, createLocalTopicOnly);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
             resumeAsyncResponseExceptionally(asyncResponse, e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index db6caca..81ec0d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -785,7 +785,7 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
+        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 39a99d2..a1c7276 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -169,7 +170,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         // 3) Create the partitioned topic
         response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -289,7 +290,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // 3) Create the partitioned topic
         AsyncResponse response  = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 1, true);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
@@ -375,7 +376,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(new Policies()).when(persistentTopics).getNamespacePolicies(any());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
         verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
         Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
     }
@@ -399,7 +400,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, false, 10);
@@ -428,7 +429,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         // 3) create partitioned topic and unload
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         response = mock(AsyncResponse.class);
@@ -458,13 +459,13 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
-        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
+        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -494,7 +495,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testCreateExistedPartition() {
         final AsyncResponse response = mock(AsyncResponse.class);
         final String topicName = "test-create-existed-partition";
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3, true);
 
         final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName();
         try {
@@ -513,7 +514,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(
+                response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -553,7 +555,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final int numPartitions = 5;
         AsyncResponse response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        persistentTopics.createPartitionedTopic(
+                response, testTenant, testNamespace, partitionedTopicName, numPartitions, true);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
@@ -596,7 +599,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // create partitioned topic and compaction on it
         response = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 2, true);
         persistentTopics.compact(response, testTenant, testNamespace, partitionTopicName, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -613,7 +616,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
                 topicName).toString();
         final String subscriptionName = "sub";
 
-        admin.topics().createPartitionedTopic(topic, 3);
+        ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 3, true).get();
 
         final String partitionedTopic = topic + "-partition-0";
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index d261e85..0ea39aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
@@ -35,6 +36,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.SortedSet;
@@ -62,6 +64,7 @@ import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -79,7 +82,6 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -1121,6 +1123,64 @@ public class ReplicatorTest extends ReplicatorTestBase {
 
         consumer.close();
     }
+
+
+    @Test
+    public void createPartitionedTopicTest() throws Exception {
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String cluster3 = pulsar3.getConfig().getClusterName();
+        final String namespace = newUniqueName("pulsar/ns");
+
+        final String persistentPartitionedTopic =
+                newUniqueName("persistent://" + namespace + "/partitioned");
+        final String persistentNonPartitionedTopic =
+                newUniqueName("persistent://" + namespace + "/non-partitioned");
+        final String nonPersistentPartitionedTopic =
+                newUniqueName("non-persistent://" + namespace + "/partitioned");
+        final String nonPersistentNonPartitionedTopic =
+                newUniqueName("non-persistent://" + namespace + "/non-partitioned");
+        final int numPartitions = 3;
+
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2, cluster3));
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
+
+        admin1.topics().createPartitionedTopic(persistentPartitionedTopic, numPartitions);
+        admin1.topics().createPartitionedTopic(nonPersistentPartitionedTopic, numPartitions);
+        admin1.topics().createNonPartitionedTopic(persistentNonPartitionedTopic);
+        admin1.topics().createNonPartitionedTopic(nonPersistentNonPartitionedTopic);
+
+        List<String> partitionedTopicList = admin1.topics().getPartitionedTopicList(namespace);
+        Assert.assertTrue(partitionedTopicList.contains(persistentPartitionedTopic));
+        Assert.assertTrue(partitionedTopicList.contains(nonPersistentPartitionedTopic));
+
+        // expected topic list didn't contain non-persistent-non-partitioned topic,
+        // because this model topic didn't create path in local metadata store.
+        List<String> expectedTopicList = Lists.newArrayList(
+                persistentNonPartitionedTopic, nonPersistentNonPartitionedTopic);
+        TopicName pt = TopicName.get(persistentPartitionedTopic);
+        for (int i = 0; i < numPartitions; i++) {
+            expectedTopicList.add(pt.getPartition(i).toString());
+        }
+
+        checkListContainExpectedTopic(admin1, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin2, namespace, expectedTopicList);
+        checkListContainExpectedTopic(admin3, namespace, expectedTopicList);
+    }
+
+    private void checkListContainExpectedTopic(PulsarAdmin admin, String namespace, List<String> expectedTopicList) {
+        // wait non-partitioned topics replicators created finished
+        final List<String> list = new ArrayList<>();
+        Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> {
+            list.clear();
+            list.addAll(admin.topics().getList(namespace));
+            return list.size() == expectedTopicList.size();
+        });
+        for (String expectTopic : expectedTopicList) {
+            Assert.assertTrue(list.contains(expectTopic));
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 267a2a0..0dcf7fa 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -384,9 +384,15 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
+        return createPartitionedTopicAsync(topic, numPartitions, false);
+    }
+
+    public CompletableFuture<Void> createPartitionedTopicAsync(
+            String topic, int numPartitions, boolean createLocalTopicOnly) {
         checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
         TopicName tn = validateTopic(topic);
-        WebTarget path = topicPath(tn, "partitions");
+        WebTarget path = topicPath(tn, "partitions")
+                .queryParam("createLocalTopicOnly", Boolean.toString(createLocalTopicOnly));
         return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
     }
 

[pulsar] 03/13: [Security] Upgrade vertx to 3.9.8 to address CVE-2019-17640 (#10889)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit be1eedc24a9c1ddc2c52cf9b7514e29d581892fd
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Jun 21 08:54:07 2021 +0300

    [Security] Upgrade vertx to 3.9.8 to address CVE-2019-17640 (#10889)
    
    (cherry picked from commit bce9144fb3c6c444d441dcf5527450ab4caed0f0)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 9 +++++----
 pom.xml                                          | 2 +-
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index cc44004..6408cf7 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -503,10 +503,11 @@ The Apache Software License, Version 2.0
   * JCTools - Java Concurrency Tools for the JVM
     - org.jctools-jctools-core-2.1.2.jar
   * Vertx
-    - io.vertx-vertx-auth-common-3.5.4.jar
-    - io.vertx-vertx-bridge-common-3.5.4.jar
-    - io.vertx-vertx-core-3.5.4.jar
-    - io.vertx-vertx-web-3.5.4.jar
+    - io.vertx-vertx-auth-common-3.9.8.jar
+    - io.vertx-vertx-bridge-common-3.9.8.jar
+    - io.vertx-vertx-core-3.9.8.jar
+    - io.vertx-vertx-web-3.9.8.jar
+    - io.vertx-vertx-web-common-3.9.8.jar
   * Apache ZooKeeper
     - org.apache.zookeeper-zookeeper-3.6.3.jar
     - org.apache.zookeeper-zookeeper-jute-3.6.3.jar
diff --git a/pom.xml b/pom.xml
index 833f3f9..a38204f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,7 +115,7 @@ flexible messaging model and an intuitive client API.</description>
     <jersey.version>2.34</jersey.version>
     <athenz.version>1.10.9</athenz.version>
     <prometheus.version>0.5.0</prometheus.version>
-    <vertx.version>3.5.4</vertx.version>
+    <vertx.version>3.9.8</vertx.version>
     <rocksdb.version>6.10.2</rocksdb.version>
     <slf4j.version>1.7.25</slf4j.version>
     <commons.collections.version>3.2.2</commons.collections.version>

[pulsar] 10/13: [Python] Fixed import when AvroSchema is not being used (#11034)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a5e643cf6bf8b9b3da229d3986fdf83ba81c5122
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 24 04:53:55 2021 +0200

    [Python] Fixed import when AvroSchema is not being used (#11034)
    
    ### Motivation
    
    Fixes #10929
    
    Since by default we're not marking `fastavro` as a dependency, we shouldn't failing when the dependency
    is not there, unless a user is really trying to use `AvroSchema`,  in which case we should give a useful error message.
    
    (cherry picked from commit b4fa411e775b1cb7c21395fab83a16e60938d147)
---
 pulsar-client-cpp/python/pulsar/schema/__init__.py |  3 +-
 pulsar-client-cpp/python/pulsar/schema/schema.py   | 27 ---------
 .../python/pulsar/schema/schema_avro.py            | 67 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 28 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py
index a38513f..150629d 100644
--- a/pulsar-client-cpp/python/pulsar/schema/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py
@@ -20,4 +20,5 @@
 from .definition import Record, Field, Null, Boolean, Integer, Long, \
             Float, Double, Bytes, String, Array, Map
 
-from .schema import Schema, BytesSchema, StringSchema, JsonSchema, AvroSchema
+from .schema import Schema, BytesSchema, StringSchema, JsonSchema
+from .schema_avro import AvroSchema
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py
index d0da91a..083efc3 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema.py
@@ -20,9 +20,7 @@
 
 from abc import abstractmethod
 import json
-import fastavro
 import _pulsar
-import io
 import enum
 
 
@@ -95,28 +93,3 @@ class JsonSchema(Schema):
 
     def decode(self, data):
         return self._record_cls(**json.loads(data))
-
-
-class AvroSchema(Schema):
-    def __init__(self, record_cls):
-        super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO,
-                                         record_cls.schema(), 'AVRO')
-        self._schema = record_cls.schema()
-
-    def _get_serialized_value(self, x):
-        if isinstance(x, enum.Enum):
-            return x.name
-        else:
-            return x
-
-    def encode(self, obj):
-        self._validate_object_type(obj)
-        buffer = io.BytesIO()
-        m = {k: self._get_serialized_value(v) for k, v in obj.__dict__.items()}
-        fastavro.schemaless_writer(buffer, self._schema, m)
-        return buffer.getvalue()
-
-    def decode(self, data):
-        buffer = io.BytesIO(data)
-        d = fastavro.schemaless_reader(buffer, self._schema)
-        return self._record_cls(**d)
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
new file mode 100644
index 0000000..2afa9db
--- /dev/null
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import _pulsar
+import io
+import enum
+
+from .schema import Schema
+
+try:
+    import fastavro
+    HAS_AVRO = True
+except ModuleNotFoundError:
+    HAS_AVRO = False
+
+if HAS_AVRO:
+    class AvroSchema(Schema):
+        def __init__(self, record_cls):
+            super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO,
+                                             record_cls.schema(), 'AVRO')
+            self._schema = record_cls.schema()
+
+        def _get_serialized_value(self, x):
+            if isinstance(x, enum.Enum):
+                return x.name
+            else:
+                return x
+
+        def encode(self, obj):
+            self._validate_object_type(obj)
+            buffer = io.BytesIO()
+            m = {k: self._get_serialized_value(v) for k, v in obj.__dict__.items()}
+            fastavro.schemaless_writer(buffer, self._schema, m)
+            return buffer.getvalue()
+
+        def decode(self, data):
+            buffer = io.BytesIO(data)
+            d = fastavro.schemaless_reader(buffer, self._schema)
+            return self._record_cls(**d)
+
+else:
+    class AvroSchema(Schema):
+        def __init__(self, _record_cls):
+            raise Exception("Avro library support was not found. Make sure to install Pulsar client " +
+                            "with Avro support: pip3 install 'pulsar-client[avro]'")
+
+        def encode(self, obj):
+            pass
+
+        def decode(self, data):
+            pass

[pulsar] 04/13: Change the nar package name for pulsar-io-kafka-connect-adaptor (#10976)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fc27d0ee66b5a0585c25b72524fc11ae6f431f29
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jun 21 15:24:25 2021 +0800

    Change the nar package name for pulsar-io-kafka-connect-adaptor (#10976)
    
    * Change the nar package name for pulsar-io-kafka-connect-adaptor
    
    We have change the nar package name in #9808 which will broken the website download link,
    The reported issue is https://issues.apache.org/jira/browse/PULSAR-16
    
    * Fix tests.
    
    (cherry picked from commit 16667d8a7f461a5b57426fc9de68fde033892f8e)
---
 distribution/io/src/assemble/io.xml         | 2 +-
 pulsar-io/kafka-connect-adaptor-nar/pom.xml | 3 +++
 2 files changed, 4 insertions(+), 1 deletion(-)

diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml
index 7aaeff3..3408919 100644
--- a/distribution/io/src/assemble/io.xml
+++ b/distribution/io/src/assemble/io.xml
@@ -57,7 +57,7 @@
     <file><source>${basedir}/../../pulsar-io/data-generator/target/pulsar-io-data-generator-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/aerospike/target/pulsar-io-aerospike-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/elastic-search/target/pulsar-io-elastic-search-${project.version}.nar</source></file>
-    <file><source>${basedir}/../../pulsar-io/kafka-connect-adaptor-nar/target/pulsar-io-kafka-connect-adaptor-nar-${project.version}.nar</source></file>
+    <file><source>${basedir}/../../pulsar-io/kafka-connect-adaptor-nar/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/hbase/target/pulsar-io-hbase-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/kinesis/target/pulsar-io-kinesis-${project.version}.nar</source></file>
     <file><source>${basedir}/../../pulsar-io/hdfs2/target/pulsar-io-hdfs2-${project.version}.nar</source></file>
diff --git a/pulsar-io/kafka-connect-adaptor-nar/pom.xml b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
index 28a4d69..08bb0a6 100644
--- a/pulsar-io/kafka-connect-adaptor-nar/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor-nar/pom.xml
@@ -44,6 +44,9 @@
       <plugin>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-nar-maven-plugin</artifactId>
+        <configuration>
+          <finalName>pulsar-io-kafka-connect-adaptor-${project.version}</finalName>
+        </configuration>
       </plugin>
     </plugins>
   </build>

[pulsar] 01/13: fix parseMessageMetadata error cause by not skip broker entry metadata (#10968)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c3e904ae56b660ca7b0f7b201effcc884654fd5
Author: Aloys <lo...@gmail.com>
AuthorDate: Sat Jun 19 02:00:48 2021 +0800

    fix parseMessageMetadata error cause by not skip broker entry metadata (#10968)
    
    Fixes #10967
    
    ### Motivation
    fix parseMessageMetadata error cause by not skip broker entry metadata
    
    ### Modifications
    
    skip broker entry metadata if exist before parsing message metadata
    
    (cherry picked from commit 0774b5fddddc0c9fe9b7cc00ae40e43322690ef1)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  2 -
 .../broker/service/BrokerEntryMetadataE2ETest.java | 20 +++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java |  2 -
 .../apache/pulsar/client/impl/MessageImplTest.java | 51 ++++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  5 ++-
 5 files changed, 74 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4a03cd1..76211b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2451,8 +2451,6 @@ public class PersistentTopicsBase extends AdminResource {
         PositionImpl pos = (PositionImpl) entry.getPosition();
         ByteBuf metadataAndPayload = entry.getDataBuffer();
 
-        // moves the readerIndex to the payload
-        Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
         MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
 
         ResponseBuilder responseBuilder = Response.ok();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 5cbaf3d..e7d98a8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -109,4 +109,24 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
         Assert.assertEquals(messages.size(), 1);
         Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
     }
+
+    @Test(timeOut = 20000)
+    public void testGetLastMessageId() throws Exception {
+        final String topic = "persistent://prop/ns-abc/topic-test";
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.newMessage().value("hello".getBytes()).send();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName(subscription)
+                .subscribe();
+        consumer.getLastMessageId();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index efb66e5..c9370f3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -300,8 +300,6 @@ public class MessageImpl<T> implements Message<T> {
         @SuppressWarnings("unchecked")
         MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get();
 
-        Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
-
         Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata);
         msg.payload = headersAndPayloadWithBrokerEntryMetadata;
         msg.messageId = null;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 0a57d93..17b77a2 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -483,4 +483,55 @@ public class MessageImplTest {
             fail();
         }
     }
+
+    @Test(timeOut = 30000)
+    public void testParseMessageMetadataWithBrokerEntryMetadata() {
+        int MOCK_BATCH_SIZE = 10;
+        String data = "test-message";
+        ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
+        byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+        // first, build a message with broker entry metadata
+
+        // build message metadata
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setPublishTime(1)
+                .setProducerName("test")
+                .setSequenceId(1);
+        byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf);
+
+        // build broker entry metadata
+        BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata()
+                .setIndex(MOCK_BATCH_SIZE - 1);
+
+        // build final data which contains broker entry metadata
+        int brokerMetaSize = brokerMetadata.getSerializedSize();
+        ByteBuf  brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
+        brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+        brokerMeta.writeInt(brokerMetaSize);
+        brokerMetadata.writeTo(brokerMeta);
+
+        CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        compositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+        dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf);
+
+        //second, parse message metadata without skip broker entry metadata
+        Commands.skipChecksumIfPresent(compositeByteBuf);
+        int metadataSize = (int) compositeByteBuf.readUnsignedInt();
+        MessageMetadata md = new MessageMetadata();
+        try {
+            md.parseFrom(compositeByteBuf, metadataSize);
+            Assert.fail("Parse operation should be failed.");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+
+        //third, parse message metadata with skip broker entry metadata first
+        MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf);
+        assertEquals(metadata.getPublishTime(), 1);
+        assertEquals(metadata.getProducerName(), "test");
+        assertEquals(metadata.getSequenceId(), 1);
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 0c80395..7c93ecc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -418,6 +418,9 @@ public class Commands {
     }
 
     public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) {
+        // initially reader-index may point to start of broker entry metadata :
+        // increment reader-index to start_of_headAndPayload to parse metadata
+        skipBrokerEntryMetadataIfExist(buffer);
         // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata
         // to parse metadata
         skipChecksumIfPresent(buffer);
@@ -1667,7 +1670,6 @@ public class Commands {
         try {
             // save the reader index and restore after parsing
             int readerIdx = metadataAndPayload.readerIndex();
-            skipBrokerEntryMetadataIfExist(metadataAndPayload);
             MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
             metadataAndPayload.readerIndex(readerIdx);
 
@@ -1682,7 +1684,6 @@ public class Commands {
     public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
         try {
             int readerIdx = metadataAndPayload.readerIndex();
-            skipBrokerEntryMetadataIfExist(metadataAndPayload);
             MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
             metadataAndPayload.readerIndex(readerIdx);
             if (metadata.hasOrderingKey()) {

[pulsar] 08/13: Fix potential data lost on the system topic when topic compaction have not triggered yet (#11003)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72f391105af6b1cdd4f25ef66557806505f12df4
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 22 18:11:06 2021 +0800

    Fix potential data lost on the system topic when topic compaction have not triggered yet (#11003)
    
    To pre-create the subscription for the compactor to avoid lost any data since we are using reader
    for reading data from the __change_events topic, if no durable subscription on the topic,
    the data might be lost. Since we are using the topic compaction on the __change_events topic
    to reduce the topic policy cache recovery time,
    so we can leverage the topic compaction cursor for retaining the data.
    
    (cherry picked from commit 94ec03111369e694f432ca219be77820648d2188)
---
 .../pulsar/broker/service/BrokerService.java       | 11 ++++-
 .../broker/service/persistent/PersistentTopic.java | 13 ++++--
 .../broker/service/persistent/SystemTopic.java     | 21 ++++++++-
 .../pulsar/broker/admin/TopicPoliciesTest.java     | 14 +++++-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  | 53 ++++++++++++----------
 5 files changed, 79 insertions(+), 33 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9c9d482..dc35db2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1223,8 +1223,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                                 PersistentTopic persistentTopic = isSystemTopic(topic)
                                         ? new SystemTopic(topic, ledger, BrokerService.this)
                                         : new PersistentTopic(topic, ledger, BrokerService.this);
+                                CompletableFuture<Void> preCreateSubForCompaction =
+                                        CompletableFuture.completedFuture(null);
+                                if (persistentTopic instanceof SystemTopic) {
+                                    preCreateSubForCompaction = ((SystemTopic) persistentTopic)
+                                            .preCreateSubForCompactionIfNeeded();
+                                }
                                 CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication();
-                                replicationFuture.thenCompose(v -> {
+                                FutureUtil.waitForAll(Lists.newArrayList(preCreateSubForCompaction, replicationFuture))
+                                .thenCompose(v -> {
                                     // Also check dedup status
                                     return persistentTopic.checkDeduplicationStatus();
                                 }).thenRun(() -> {
@@ -1255,7 +1262,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
                                     return null;
                                 });
-                            } catch (NamingException e) {
+                            } catch (NamingException | PulsarServerException e) {
                                 log.warn("Failed to create topic {}-{}", topic, e.getMessage());
                                 pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
                                 topicFuture.completeExceptionally(e);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 38b3264..8f7f356 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -147,7 +148,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
-import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.utils.StatsOutputStream;
@@ -381,7 +381,7 @@ public class PersistentTopic extends AbstractTopic
     private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
             boolean replicated) {
         checkNotNull(compactedTopic);
-        if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
+        if (subscriptionName.equals(COMPACTION_SUBSCRIPTION)) {
             return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
         } else {
             return new PersistentSubscription(this, subscriptionName, cursor, replicated);
@@ -1369,7 +1369,7 @@ public class PersistentTopic extends AbstractTopic
 
                 long backlogEstimate = 0;
 
-                PersistentSubscription compactionSub = subscriptions.get(Compactor.COMPACTION_SUBSCRIPTION);
+                PersistentSubscription compactionSub = subscriptions.get(COMPACTION_SUBSCRIPTION);
                 if (compactionSub != null) {
                     backlogEstimate = compactionSub.estimateBacklogSize();
                 } else {
@@ -1396,6 +1396,13 @@ public class PersistentTopic extends AbstractTopic
         }
     }
 
+    /**
+     * Return if the topic has triggered compaction before or not.
+     */
+    protected boolean hasCompactionTriggered() {
+        return subscriptions.containsKey(COMPACTION_SUBSCRIPTION);
+    }
+
     CompletableFuture<Void> startReplicator(String remoteCluster) {
         log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
index 1b49dbe..788aa94 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java
@@ -19,14 +19,18 @@
 
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
+import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
 
 public class SystemTopic extends PersistentTopic {
 
     public SystemTopic(String topic, ManagedLedger ledger, BrokerService brokerService)
-            throws BrokerServiceException.NamingException {
+            throws BrokerServiceException.NamingException, PulsarServerException {
         super(topic, ledger, brokerService);
     }
 
@@ -54,4 +58,19 @@ public class SystemTopic extends PersistentTopic {
     public void checkGC() {
         // do nothing for system topic
     }
+
+    public CompletableFuture<Void> preCreateSubForCompactionIfNeeded() {
+        if (!super.hasCompactionTriggered()) {
+            // To pre-create the subscription for the compactor to avoid lost any data since we are using reader
+            // for reading data from the __change_events topic, if no durable subscription on the topic,
+            // the data might be lost. Since we are using the topic compaction on the __change_events topic
+            // to reduce the topic policy cache recovery time,
+            // so we can leverage the topic compaction cursor for retaining the data.
+            return super.createSubscription(COMPACTION_SUBSCRIPTION,
+                    CommandSubscribe.InitialPosition.Earliest, false)
+                    .thenCompose(__ -> CompletableFuture.completedFuture(null));
+        } else {
+            return CompletableFuture.completedFuture(null);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 30a48b2..3d47212 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
@@ -2295,6 +2296,15 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         log.info("Backlog quota set success on topic: {}", testTopic);
 
         Awaitility.await()
+                .untilAsserted(() -> {
+                    TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic);
+                    Assert.assertTrue(stats.getSubscriptions().containsKey("__compaction"));
+                });
+
+        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicPolicyEventsTopic);
+        long previousCompactedLedgerId = internalStats.compactedLedger.ledgerId;
+
+        Awaitility.await()
                 .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic)
                         .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota));
 
@@ -2302,8 +2312,8 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
 
         Awaitility.await()
                 .untilAsserted(() -> {
-                    TopicStats stats = admin.topics().getStats(topicPolicyEventsTopic);
-                    Assert.assertTrue(stats.getSubscriptions().containsKey("__compaction"));
+                    PersistentTopicInternalStats iStats = admin.topics().getInternalStats(topicPolicyEventsTopic);
+                    Assert.assertTrue(iStats.compactedLedger.ledgerId != previousCompactedLedgerId);
                 });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 0d81459..aaa4618 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -85,11 +85,14 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, initPolicy).get();
 
         // Wait for all topic policies updated.
-        Thread.sleep(3000);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(systemTopicBasedTopicPoliciesService
+                        .getPoliciesCacheInit(TOPIC1.getNamespaceObject())));
 
-        Assert.assertTrue(systemTopicBasedTopicPoliciesService.getPoliciesCacheInit(TOPIC1.getNamespaceObject()));
         // Assert broker is cache all topic policies
-        Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1).getMaxConsumerPerTopic().intValue(), 10);
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1)
+                        .getMaxConsumerPerTopic().intValue(), 10));
 
         // Update policy for TOPIC1
         TopicPolicies policies1 = TopicPolicies.builder()
@@ -127,21 +130,21 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
                 .build();
         systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC6, policies6).get();
 
-        Thread.sleep(1000);
-
-        TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
-        TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
-        TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3);
-        TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4);
-        TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5);
-        TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6);
-
-        Assert.assertEquals(policiesGet1, policies1);
-        Assert.assertEquals(policiesGet2, policies2);
-        Assert.assertEquals(policiesGet3, policies3);
-        Assert.assertEquals(policiesGet4, policies4);
-        Assert.assertEquals(policiesGet5, policies5);
-        Assert.assertEquals(policiesGet6, policies6);
+        Awaitility.await().untilAsserted(() -> {
+            TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
+            TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
+            TopicPolicies policiesGet3 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC3);
+            TopicPolicies policiesGet4 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC4);
+            TopicPolicies policiesGet5 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC5);
+            TopicPolicies policiesGet6 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC6);
+
+            Assert.assertEquals(policiesGet1, policies1);
+            Assert.assertEquals(policiesGet2, policies2);
+            Assert.assertEquals(policiesGet3, policies3);
+            Assert.assertEquals(policiesGet4, policies4);
+            Assert.assertEquals(policiesGet5, policies5);
+            Assert.assertEquals(policiesGet6, policies6);
+        });
 
         // Remove reader cache will remove policies cache
         Assert.assertEquals(systemTopicBasedTopicPoliciesService.getPoliciesCacheSize(), 6);
@@ -164,13 +167,13 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         policies1.setMaxProducerPerTopic(106);
         systemTopicBasedTopicPoliciesService.updateTopicPoliciesAsync(TOPIC1, policies1);
 
-        Thread.sleep(1000);
-
         // reader for NAMESPACE1 will back fill the reader cache
-        policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
-        policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
-        Assert.assertEquals(policies1, policiesGet1);
-        Assert.assertEquals(policies2, policiesGet2);
+        Awaitility.await().untilAsserted(() -> {
+            TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC1);
+            TopicPolicies policiesGet2 = systemTopicBasedTopicPoliciesService.getTopicPolicies(TOPIC2);
+            Assert.assertEquals(policies1, policiesGet1);
+            Assert.assertEquals(policies2, policiesGet2);
+        });
 
         // Check reader cache is correct.
         Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE2)));
@@ -178,7 +181,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         Assert.assertTrue(systemTopicBasedTopicPoliciesService.checkReaderIsCached(NamespaceName.get(NAMESPACE3)));
 
         // Check get without cache
-        policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get();
+        TopicPolicies policiesGet1 = systemTopicBasedTopicPoliciesService.getTopicPoliciesBypassCacheAsync(TOPIC1).get();
         Assert.assertEquals(policies1, policiesGet1);
     }
 

[pulsar] 09/13: [Transaction] Fix broker init transaction related topic. (#11022)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 83a29ecc0332a9bc65a8e4474c27955accb1359f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jun 23 23:07:18 2021 +0800

    [Transaction] Fix broker init transaction related topic. (#11022)
    
    (cherry picked from commit e092fb33699c227c110a9c707ff6fcfedfd1ee7a)
---
 .../org/apache/pulsar/broker/PulsarService.java    |  14 +-
 .../service/persistent/PersistentSubscription.java |   7 +-
 .../broker/service/persistent/PersistentTopic.java |   7 +-
 .../pulsar/broker/transaction/TransactionTest.java | 142 +++++++++++++++++++++
 4 files changed, 157 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index cbd3eb3..0d01968 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -108,6 +109,7 @@ import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.validator.MultipleListenerValidator;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -122,6 +124,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -990,7 +993,7 @@ public class PulsarService implements AutoCloseable {
             for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) {
                 try {
                     TopicName topicName = TopicName.get(topic);
-                    if (bundle.includes(topicName)) {
+                    if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) {
                         CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic);
                         if (future != null) {
                             persistentTopics.add(future);
@@ -1584,4 +1587,13 @@ public class PulsarService implements AutoCloseable {
         return workerConfig;
     }
 
+
+    private static boolean isTransactionSystemTopic(TopicName topicName) {
+        String topic = topicName.toString();
+        return topic.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString())
+                || topic.startsWith(TopicName.get(TopicDomain.persistent.value(),
+                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString())
+                || topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 16f2259..316ac10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -59,7 +59,6 @@ import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleDisabled;
 import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -77,7 +76,6 @@ import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -141,10 +139,7 @@ public class PersistentSubscription implements Subscription {
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(TopicName.get(topicName))
-                && !topicName.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
-                && !topicName.startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)
-                && !topicName.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
+                && !checkTopicIsEventsNames(TopicName.get(topicName))) {
             this.pendingAckHandle = new PendingAckHandleImpl(this);
         } else {
             this.pendingAckHandle = new PendingAckHandleDisabled();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8f7f356..3d5e10d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -107,7 +107,6 @@ import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
-import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
@@ -149,7 +148,6 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.utils.StatsOutputStream;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
@@ -317,10 +315,7 @@ public class PersistentTopic extends AbstractTopic
         checkReplicatedSubscriptionControllerState();
         TopicName topicName = TopicName.get(topic);
         if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(topicName)
-                && !topicName.getEncodedLocalName().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
-                && !topicName.getEncodedLocalName().startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)
-                && !topicName.getEncodedLocalName().endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
+                && !checkTopicIsEventsNames(topicName)) {
             this.transactionBuffer = brokerService.getPulsar()
                     .getTransactionBufferProvider().newTransactionBuffer(this, transactionCompletableFuture);
         } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
new file mode 100644
index 0000000..ce8b3ae
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction;
+
+import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
+import com.google.common.collect.Sets;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Pulsar client transaction test.
+ */
+@Slf4j
+@Test(groups = "broker")
+public class TransactionTest extends TransactionTestBase {
+
+    private static final String TENANT = "tnx";
+    private static final String NAMESPACE1 = TENANT + "/ns1";
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        this.setBrokerCount(1);
+        this.internalSetup();
+
+        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length - 1];
+        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build());
+        admin.tenants().createTenant(TENANT,
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NAMESPACE1);
+
+        admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1);
+        pulsarClient.close();
+        pulsarClient = PulsarClient.builder()
+                .serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .enableTransaction(true)
+                .build();
+    }
+
+    @Test
+    public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
+        String subName = "test";
+
+        String topicName = TopicName.get(NAMESPACE1 + "/test").toString();
+
+
+        @Cleanup
+        Consumer<byte[]> consumer = getConsumer(topicName, subName);
+
+        consumer.close();
+
+        Awaitility.await().until(() -> {
+            try {
+                pulsarClient.newTransaction()
+                        .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        });
+
+        admin.namespaces().unload(NamespaceName.SYSTEM_NAMESPACE.toString());
+        admin.namespaces().unload(NAMESPACE1);
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = getConsumer(topicName, subName);
+
+        Awaitility.await().until(() -> {
+            try {
+                pulsarClient.newTransaction()
+                        .withTransactionTimeout(30, TimeUnit.SECONDS).build().get();
+            } catch (Exception e) {
+                return false;
+            }
+            return true;
+        });
+
+        ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
+                getPulsarServiceList().get(0).getBrokerService().getTopics();
+
+        Assert.assertNull(topics.get(TopicName.get(TopicDomain.persistent.value(),
+                NamespaceName.SYSTEM_NAMESPACE, TRANSACTION_LOG_PREFIX).toString() + 0));
+        Assert.assertNull(topics.get(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString()));
+        Assert.assertNull(topics.get(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topicName, subName)));
+    }
+
+
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    public Consumer<byte[]> getConsumer(String topicName, String subName) throws PulsarClientException {
+        return pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .subscriptionType(SubscriptionType.Shared)
+                .enableBatchIndexAcknowledgment(true)
+                .subscribe();
+    }
+}
\ No newline at end of file

[pulsar] 05/13: fix NoClassDefFoundError - io.airlift.compress.lz4.UnsafeUtil (#10983)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b7259cd988adf6e1edced5aacb1d9403bcee27d0
Author: newur <gi...@mailbox.org>
AuthorDate: Sun Jun 20 20:43:38 2021 +0200

    fix NoClassDefFoundError - io.airlift.compress.lz4.UnsafeUtil (#10983)
    
    * fix NoClassDefFoundError - io.airlift.compress.lz4.UnsafeUtil
    
    Reading a message fails with
    
    [org.apa.pul.cli.imp.ClientCnx] (pulsar-client-io-2-1) [localhost/127.0.0.1:6650] Got exception java.lang.NoClassDefFoundError: Could not initialize class io.airlift.compress.lz4.UnsafeUtil
    	at io.airlift.compress.lz4.Lz4RawDecompressor.decompress(Lz4RawDecompressor.java:60)
    	at org.apache.pulsar.common.compression.CompressionCodecLZ4.decode(CompressionCodecLZ4.java:91)
    
    * update aircompressor license file
    
    * update aircompressor license file
    
    (cherry picked from commit 2ea1ce66c04ad64d0adc85d21e6e369f0a0f325c)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 2 +-
 pom.xml                                          | 2 +-
 pulsar-sql/presto-distribution/LICENSE           | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 6408cf7..ddff724 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -419,7 +419,7 @@ The Apache Software License, Version 2.0
     - org.apache.httpcomponents-httpclient-4.5.13.jar
     - org.apache.httpcomponents-httpcore-4.4.13.jar
  * AirCompressor
-    - io.airlift-aircompressor-0.16.jar
+    - io.airlift-aircompressor-0.19.jar
  * AsyncHttpClient
     - org.asynchttpclient-async-http-client-2.12.1.jar
     - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
diff --git a/pom.xml b/pom.xml
index a38204f..b25ef7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@ flexible messaging model and an intuitive client API.</description>
     <kafka.confluent.schemaregistryclient.version>5.3.0</kafka.confluent.schemaregistryclient.version>
     <kafka.confluent.avroserializer.version>5.3.0</kafka.confluent.avroserializer.version>
     <kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
-    <aircompressor.version>0.16</aircompressor.version>
+    <aircompressor.version>0.19</aircompressor.version>
     <asynchttpclient.version>2.12.1</asynchttpclient.version>
     <jcommander.version>1.78</jcommander.version>
     <commons-lang3.version>3.11</commons-lang3.version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 33b5ab1..0d10346 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -274,7 +274,7 @@ The Apache Software License, Version 2.0
   * CGLIB Nodep
     - cglib-nodep-3.3.0.jar
   * Airlift
-    - aircompressor-0.16.jar
+    - aircompressor-0.19.jar
     - airline-0.8.jar
     - bootstrap-0.199.jar
     - bootstrap-0.195.jar
@@ -559,4 +559,4 @@ Bouncy Castle License
    - bcpkix-jdk15on-1.69.jar
    - bcprov-ext-jdk15on-1.69.jar
    - bcprov-jdk15on-1.69.jar
-   - bcutil-jdk15on-1.69.jar
\ No newline at end of file
+   - bcutil-jdk15on-1.69.jar

[pulsar] 02/13: [Security] Upgrade commons-codec to 1.15 (#10864)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1a2f820a913c791c7f342e5d1f1b58498cd33ce4
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Jun 21 08:54:30 2021 +0300

    [Security] Upgrade commons-codec to 1.15 (#10864)
    
    - contains security fix https://issues.apache.org/jira/browse/CODEC-270
      (since commons-codec 1.14)
    
    (cherry picked from commit 2925dc1597c4513b00738bfb69a8bb8ca7059af9)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 2 +-
 pom.xml                                          | 2 +-
 pulsar-sql/presto-distribution/LICENSE           | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 02559b6..cc44004 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -342,7 +342,7 @@ The Apache Software License, Version 2.0
     - com.yahoo.datasketches-sketches-core-0.8.3.jar
  * Apache Commons
     - commons-cli-commons-cli-1.2.jar
-    - commons-codec-commons-codec-1.10.jar
+    - commons-codec-commons-codec-1.15.jar
     - commons-collections-commons-collections-3.2.2.jar
     - commons-configuration-commons-configuration-1.10.jar
     - commons-io-commons-io-2.8.0.jar
diff --git a/pom.xml b/pom.xml
index 9087dc2..833f3f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,7 +172,7 @@ flexible messaging model and an intuitive client API.</description>
     <commons-lang3.version>3.11</commons-lang3.version>
     <commons-configuration.version>1.10</commons-configuration.version>
     <commons-io.version>2.8.0</commons-io.version>
-    <commons-codec.version>1.10</commons-codec.version>
+    <commons-codec.version>1.15</commons-codec.version>
     <javax.ws.rs-api.version>2.1</javax.ws.rs-api.version>
     <log4j.version>1.2.17</log4j.version>
     <hdrHistogram.version>2.1.9</hdrHistogram.version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index e61b8c9..33b5ab1 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -426,7 +426,7 @@ The Apache Software License, Version 2.0
     - codahale-metrics-provider-4.14.1.jar
   * Apache Commons
     - commons-cli-1.2.jar
-    - commons-codec-1.10.jar
+    - commons-codec-1.15.jar
     - commons-collections4-4.1.jar
     - commons-configuration-1.10.jar
     - commons-io-2.8.0.jar

[pulsar] 12/13: [C++] Fix Windows 32 bits compile and runtime failures (#11082)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 68889ef0f5095cf407ae7d36bcc253e64b2dcfda
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Jun 25 08:59:13 2021 +0800

    [C++] Fix Windows 32 bits compile and runtime failures (#11082)
    
    ### Motivation
    
    C++ source code cannot be compiled successfully for Windows 32-bit build. The compile error is:
    
    >  fatal error C1021: invalid preprocessor command 'warning'
    
    It's because `#warning` preprocessor directive is not supported by MSVC compilers.
    
    And even if the related code was remove and the build succeeded, a example producer program would crash. This bug is introduced from https://github.com/apache/pulsar/pull/6129. It's because hardware CRC32 implementation on Windows is only provided for 64-bit programs. However, the `__cpuid` check in `crc32_initialize()` still returns true for 32-bit build, so finally it went here:
    
    ```c++
    uint32_t crc32c(uint32_t init, const void *buf, size_t len, const chunk_config *config) {
        // SSE 4.2 extension for hw implementation are not present
        abort();
    }
    ```
    
    The `abort()` call will terminate the process.
    
    ### Modifications
    
    - Use `#pragma message()`  to replace `#warning` if the compiler is MSVC.
    - Fallback to software implementation of CRC32 checksum algorithm if the hardware implementation is not supported.
    - Add the workflow to build Windows 32-bit C++ library.
    - Fix the wrong document about how to build it for Windows 32-bit.
    
    (cherry picked from commit c8fe1e89469228a59967b8716cada0fcf63315e1)
---
 .github/workflows/ci-cpp-build-windows.yaml    |  7 +++++++
 pulsar-client-cpp/README.md                    |  8 +++-----
 pulsar-client-cpp/lib/CMakeLists.txt           |  2 +-
 pulsar-client-cpp/lib/checksum/crc32c_sse42.cc | 12 +++++++++++-
 4 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/.github/workflows/ci-cpp-build-windows.yaml b/.github/workflows/ci-cpp-build-windows.yaml
index 6384968..c524fdc 100644
--- a/.github/workflows/ci-cpp-build-windows.yaml
+++ b/.github/workflows/ci-cpp-build-windows.yaml
@@ -52,6 +52,13 @@ jobs:
             suffix: 'windows-win64'
             generator: 'Visual Studio 16 2019'
             arch: '-A x64'
+          - name: 'Windows x86'
+            os: windows-latest
+            triplet: x86-windows
+            vcpkg_dir: 'C:\vcpkg'
+            suffix: 'windows-win32'
+            generator: 'Visual Studio 16 2019'
+            arch: '-A Win32'
 
     steps:
       - name: checkout
diff --git a/pulsar-client-cpp/README.md b/pulsar-client-cpp/README.md
index f489c3d..61f6dff 100644
--- a/pulsar-client-cpp/README.md
+++ b/pulsar-client-cpp/README.md
@@ -189,13 +189,13 @@ ${PULSAR_PATH}/pulsar-client-cpp/perf/perfConsumer
 
 It's highly recommended to use `vcpkg` for C++ package management on Windows. It's easy to install and well supported by Visual Studio (2015/2017/2019) and CMake. See [here](https://github.com/microsoft/vcpkg#quick-start-windows) for quick start.
 
-Take 64 bits Windows as an example, you only need to run
+Take Windows 64-bit library as an example, you only need to run
 
 ```bash
 vcpkg install --feature-flags=manifests --triplet x64-windows
 ```
 
-> NOTE: The default triplet is `x86-windows`, see [here](https://github.com/microsoft/vcpkg/blob/master/docs/users/triplets.md) for more details.
+> NOTE: For Windows 32-bit library, change `x64-windows` to `x86-windows`, see [here](https://github.com/microsoft/vcpkg/blob/master/docs/users/triplets.md) for more details about the triplet concept in Vcpkg.
 
 The all dependencies, which are specified by [vcpkg.json](vcpkg.json), will be installed in `vcpkg_installed/` subdirectory,
 
@@ -214,10 +214,8 @@ cmake --build ./build --config Release
 
 Then all artifacts will be built into `build` subdirectory.
 
-> NOTE:
+> NOTE: For Windows 32-bit, you need to use `-A Win32` and `-DVCPKG_TRIPLET=x86-windows`.
 >
-> 1. Change `Release` to `Debug` if you want to build a debug version library.
-> 2. For 32 bits Windows, you need to use `-A Win32` and `-DVCPKG_TRIPLET=x32-windows`.
 
 #### Install dependencies manually
 
diff --git a/pulsar-client-cpp/lib/CMakeLists.txt b/pulsar-client-cpp/lib/CMakeLists.txt
index 2e540b2..96d7283 100644
--- a/pulsar-client-cpp/lib/CMakeLists.txt
+++ b/pulsar-client-cpp/lib/CMakeLists.txt
@@ -150,7 +150,7 @@ elseif(BUILD_STATIC_LIB)
     # Install regular libpulsar.a
     target_link_libraries(pulsarStatic ${COMMON_LIBS})
     install(TARGETS pulsarStatic DESTINATION lib)
-endif(LINK_STATIC)
+endif()
 
 if (BUILD_STATIC_LIB)
     install(TARGETS pulsarStatic DESTINATION lib)
diff --git a/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc b/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc
index 89349a7..5126ec9 100644
--- a/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc
+++ b/pulsar-client-cpp/lib/checksum/crc32c_sse42.cc
@@ -19,18 +19,27 @@
 #if BOOST_VERSION >= 105500
 #include <boost/predef.h>
 #else
+#if _MSC_VER
+#pragma message("Boost version is < 1.55, disable CRC32C")
+#else
 #warning "Boost version is < 1.55, disable CRC32C"
 #endif
+#endif
 
 #include <assert.h>
 #include <stdlib.h>
+#include "lib/checksum/crc32c_sw.h"
 
 #if BOOST_ARCH_X86_64
 #include <nmmintrin.h>  // SSE4.2
 #include <wmmintrin.h>  // PCLMUL
 #else
+#ifdef _MSC_VER
+#pragma message("BOOST_ARCH_X86_64 is not defined, CRC32C will be disabled")
+#else
 #warning "BOOST_ARCH_X86_64 is not defined, CRC32C will be disabled"
 #endif
+#endif
 
 #ifdef _MSC_VER
 #include <intrin.h>
@@ -82,6 +91,7 @@ bool crc32c_initialize() {
         DEBUG_PRINTF1("has_pclmulqdq = %d\n", has_pclmulqdq);
         initialized = true;
     }
+
     return has_sse42;
 }
 
@@ -251,7 +261,7 @@ uint32_t crc32c(uint32_t init, const void *buf, size_t len, const chunk_config *
 
 uint32_t crc32c(uint32_t init, const void *buf, size_t len, const chunk_config *config) {
     // SSE 4.2 extension for hw implementation are not present
-    abort();
+    return crc32c_sw(init, buf, len);  // fallback to the software implementation
 }
 
 #endif

[pulsar] 11/13: Made the PulsarClusterMetadataTeardown deletes idempotent (#11042)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d23c5ed011f5f9d39834cbb2a9d700a4d9ed4b0e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 24 18:58:37 2021 +0200

    Made the PulsarClusterMetadataTeardown deletes idempotent (#11042)
    
    ### Motivation
    
    If some of the z-nodes are not there anymore, the cluster teardown operation should not fail.
    
    Also changed the delete recursive operation to delete in parallel for children of same node, to speed up the overall execution time.
    
    (cherry picked from commit 7efabc4909e3a4a70d56a8e790443f3b349a1a89)
---
 .../pulsar/PulsarClusterMetadataTeardown.java      | 27 +++++++++++++++-------
 .../cli/ClusterMetadataTearDownTest.java           |  3 +++
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index 53fff01..c4c2c18 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -22,6 +22,8 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -31,6 +33,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
@@ -100,7 +103,7 @@ public class PulsarClusterMetadataTeardown {
         }
 
         for (String localZkNode : localZkNodes) {
-            deleteRecursively(metadataStore, "/" + localZkNode);
+            deleteRecursively(metadataStore, "/" + localZkNode).join();
         }
 
         if (arguments.configurationStore != null && arguments.cluster != null) {
@@ -108,18 +111,26 @@ public class PulsarClusterMetadataTeardown {
             @Cleanup
             MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore,
                     MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
-            deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster);
+            deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
         }
 
         log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
     }
 
-    private static void deleteRecursively(MetadataStore metadataStore, String path){
-        metadataStore.getChildren(path).join().forEach(child -> {
-            deleteRecursively(metadataStore, path + "/" + child);
-        });
-
-        metadataStore.delete(path, Optional.empty()).join();
+    private static CompletableFuture<Void> deleteRecursively(MetadataStore metadataStore, String path) {
+        return metadataStore.getChildren(path)
+                .thenCompose(children -> FutureUtil.waitForAll(
+                        children.stream()
+                                .map(child -> deleteRecursively(metadataStore, path + "/" + child))
+                                .collect(Collectors.toList())))
+                .thenCompose(__ -> metadataStore.exists(path))
+                .thenCompose(exists -> {
+                    if (exists) {
+                        return metadataStore.delete(path, Optional.empty());
+                    } else {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                });
     }
 
     private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
index a60ecc2..8aeb4bf 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java
@@ -199,6 +199,9 @@ public class ClusterMetadataTearDownTest extends TestRetrySupport {
         }
         List<String> clusterNodes = configStore.getChildren( "/admin/clusters").join();
         assertFalse(clusterNodes.contains(pulsarCluster.getClusterName()));
+
+        // Try delete again, should not fail
+        PulsarClusterMetadataTeardown.main(args);
     }
 
     private long getNumOfLedgers() {

[pulsar] 06/13: Fix incorrect port of advertisedListener (#10961)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d7bff1e5aa3e4086fcb2d5c9b65507d597b00246
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Jun 22 10:40:47 2021 +0800

    Fix incorrect port of advertisedListener (#10961)
    
    Fixes #10951
    
    ### Motivation
    The advertisedListener has its own port, and now we have no way to obtain the port of TLS and non-TLS advertisedListener except by setting the listenerName through the client.
    Therefore, brokerServiceUrl and webServiceUrl do not return the address and port of the advertisedListener
    
    
    (cherry picked from commit 99c84c4cf029c4fa3cd3e6dfa47ed0cb5272bedc)
---
 .../pulsar/broker/ServiceConfigurationUtils.java      |  8 ++++++--
 .../validator/MultipleListenerValidatorTest.java      | 15 ++++++++++++---
 .../java/org/apache/pulsar/broker/PulsarService.java  | 12 ++++++------
 .../loadbalance/impl/ModularLoadManagerImpl.java      |  2 +-
 .../org/apache/pulsar/compaction/CompactorTool.java   |  8 ++++----
 .../org/apache/pulsar/broker/PulsarServiceTest.java   | 19 ++++++++++++-------
 6 files changed, 41 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
index fe23e6b..6ce69c5 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
@@ -54,9 +54,13 @@ public class ServiceConfigurationUtils {
      * Get the address of Broker, first try to get it from AdvertisedAddress.
      * If it is not set, try to get the address set by advertisedListener.
      * If it is still not set, get it through InetAddress.getLocalHost().
+     * @param configuration
+     * @param ignoreAdvertisedListener Sometimes we can’t use the default key of AdvertisedListener,
+     *                                 setting it to true can ignore AdvertisedListener.
      * @return
      */
-    public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration) {
+    public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration,
+                                                     boolean ignoreAdvertisedListener) {
         Map<String, AdvertisedListener> result = MultipleListenerValidator
                 .validateAndAnalysisAdvertisedListener(configuration);
 
@@ -66,7 +70,7 @@ public class ServiceConfigurationUtils {
         }
 
         AdvertisedListener advertisedListener = result.get(configuration.getInternalListenerName());
-        if (advertisedListener != null) {
+        if (advertisedListener != null && !ignoreAdvertisedListener) {
             String address = advertisedListener.getBrokerServiceUrl().getHost();
             if (address != null) {
                 return address;
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
index 8928e82..f41ed92 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.validator;
 
+import java.net.InetAddress;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.testng.annotations.Test;
@@ -46,15 +47,23 @@ public class MultipleListenerValidatorTest {
         config.setBrokerServicePortTls(Optional.of(6651));
         config.setAdvertisedListeners("internal:pulsar://192.0.0.1:6660, internal:pulsar+ssl://192.0.0.1:6651");
         config.setInternalListenerName("internal");
-        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.1");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+                "192.0.0.1");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
+                InetAddress.getLocalHost().getCanonicalHostName());
 
         config = new ServiceConfiguration();
         config.setBrokerServicePortTls(Optional.of(6651));
         config.setAdvertisedAddress("192.0.0.2");
-        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.2");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+                "192.0.0.2");
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
+                "192.0.0.2");
 
         config.setAdvertisedAddress(null);
-        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
+                ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
+        assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                 ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index be85f54..cbd3eb3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -280,7 +280,7 @@ public class PulsarService implements AutoCloseable {
         PulsarConfigurationLoader.isComplete(config);
         // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
         this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
-        this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config);
+        this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false);
         state = State.Init;
         // use `internalListenerName` listener as `advertisedAddress`
         this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
@@ -1361,7 +1361,7 @@ public class PulsarService implements AutoCloseable {
 
     protected String brokerUrl(ServiceConfiguration config) {
         if (config.getBrokerServicePort().isPresent()) {
-            return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getBrokerListenPort().get());
         } else {
             return null;
@@ -1374,7 +1374,7 @@ public class PulsarService implements AutoCloseable {
 
     public String brokerUrlTls(ServiceConfiguration config) {
         if (config.getBrokerServicePortTls().isPresent()) {
-            return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getBrokerListenPortTls().get());
         } else {
             return null;
@@ -1387,7 +1387,7 @@ public class PulsarService implements AutoCloseable {
 
     public String webAddress(ServiceConfiguration config) {
         if (config.getWebServicePort().isPresent()) {
-            return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getListenPortHTTP().get());
         } else {
             return null;
@@ -1400,7 +1400,7 @@ public class PulsarService implements AutoCloseable {
 
     public String webAddressTls(ServiceConfiguration config) {
         if (config.getWebServicePortTls().isPresent()) {
-            return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
+            return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
                     getListenPortHTTPS().get());
         } else {
             return null;
@@ -1546,7 +1546,7 @@ public class PulsarService implements AutoCloseable {
 
         // worker talks to local broker
         String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
-                ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig));
+                brokerConfig.getAdvertisedAddress());
         workerConfig.setWorkerHostname(hostname);
         workerConfig.setPulsarFunctionsCluster(brokerConfig.getClusterName());
         // inherit broker authorization setting
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 6fd2884..5b7867e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -931,7 +931,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, Consumer<Noti
         List<Metrics> metrics = Lists.newArrayList();
         Map<String, String> dimensions = new HashMap<>();
 
-        dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf));
+        dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf, true));
         dimensions.put("metric", "loadBalancing");
 
         Metrics m = Metrics.create(dimensions);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index fe7a5b8..ee2b374 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -104,15 +104,15 @@ public class CompactorTool {
             log.info("Found `brokerServicePortTls` in configuration file. \n"
                     + "Will connect pulsar use TLS.");
             clientBuilder
-                    .serviceUrl(PulsarService.brokerUrlTls(ServiceConfigurationUtils
-                                    .getAppliedAdvertisedAddress(brokerConfig),
+                    .serviceUrl(PulsarService.brokerUrlTls(
+                            ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
                             brokerConfig.getBrokerServicePortTls().get()))
                     .allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
                     .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());
 
         } else {
-            clientBuilder.serviceUrl(PulsarService.brokerUrl(ServiceConfigurationUtils
-                            .getAppliedAdvertisedAddress(brokerConfig),
+            clientBuilder.serviceUrl(PulsarService.brokerUrl(
+                    ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
                     brokerConfig.getBrokerServicePort().get()));
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
index 9da4aa7..1f80221 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java
@@ -106,17 +106,22 @@ public class PulsarServiceTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testAppliedAdvertised() throws Exception {
         useListenerName = true;
-        conf.setAdvertisedListeners("internal:pulsar://127.0.0.1, internal:pulsar+ssl://127.0.0.1");
+        conf.setAdvertisedListeners("internal:pulsar://127.0.0.1:6650, internal:pulsar+ssl://127.0.0.1:6651");
         conf.setInternalListenerName("internal");
         setup();
-
-        AssertJUnit.assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
+        assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
         assertNull(pulsar.getConfiguration().getAdvertisedAddress());
         assertEquals(conf, pulsar.getConfiguration());
-        assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://127.0.0.1:6651");
-        assertEquals(pulsar.brokerUrl(conf), "pulsar://127.0.0.1:6660");
-        assertEquals(pulsar.webAddress(conf), "http://127.0.0.1:8081");
-        assertEquals(pulsar.webAddressTls(conf), "https://127.0.0.1:8082");
+
+        cleanup();
+        resetConfig();
+        setup();
+        assertEquals(pulsar.getAdvertisedAddress(), "localhost");
+        assertEquals(conf, pulsar.getConfiguration());
+        assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://localhost:" + pulsar.getBrokerListenPortTls().get());
+        assertEquals(pulsar.brokerUrl(conf), "pulsar://localhost:" + pulsar.getBrokerListenPort().get());
+        assertEquals(pulsar.webAddress(conf), "http://localhost:" + pulsar.getWebService().getListenPortHTTP().get());
+        assertEquals(pulsar.webAddressTls(conf), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
     }
 
 }