You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/25 04:24:30 UTC

[pulsar] branch branch-2.8 updated (0734191 -> 50da9b2)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 0734191  [Transactions] Prevent NPE in case of closeAsync() without a successful execution of startAsync() (#10948)
     new 887f8ec  [pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847)
     new e7f2701  [Security] Upgrade caffeine to 2.9.1 (#10865)
     new 0ee56e2  [Security] Upgrade k8s client-java to 12.0.1 (#10866)
     new d14d593  [Security] Upgrade bouncycastle version to 1.69 (#10867)
     new 46a25f0  [Security] Exclude and remove freebuilder dependency (#10869)
     new 6f44d5c  Make Metadata Tests less Flaky (#10955)
     new 7787957  [C++] Fix Windows build issues about static library (#10956)
     new d809104  Fix NonRecoverableLedgerException when get last message ID by Reader (#10957)
     new 50da9b2  fix non-persistent topic get partitioned metadata error on discovery (#10806)

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/ci-cpp-build-windows.yaml        | 29 +++++++-
 distribution/server/src/assemble/LICENSE.bin.txt   | 17 +++--
 pom.xml                                            | 10 ++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 +
 .../pulsar/broker/service/BrokerService.java       | 14 +++-
 .../apache/pulsar/broker/service/ServerCnx.java    | 15 +++-
 .../SystemTopicBasedTopicPoliciesService.java      | 12 ++++
 .../broker/service/persistent/PersistentTopic.java | 46 ++++++++-----
 .../broker/admin/AdminApiGetLastMessageIdTest.java | 45 +++++++++++-
 .../pulsar/broker/service/ReplicatorTest.java      | 80 ++++++++++++++++++++++
 pulsar-client-cpp/lib/CMakeLists.txt               | 17 +++--
 .../org/apache/pulsar/common/util/FutureUtil.java  | 31 +++++++++
 .../discovery/service/BrokerDiscoveryProvider.java |  2 +-
 .../discovery/service/BaseDiscoveryTestSetup.java  |  7 ++
 .../discovery/service/DiscoveryServiceTest.java    | 21 ++++++
 pulsar-metadata/pom.xml                            |  7 +-
 .../org/apache/pulsar/metadata/ZKSessionTest.java  | 23 ++++---
 pulsar-sql/presto-distribution/LICENSE             |  9 +--
 pulsar-zookeeper-utils/pom.xml                     |  4 ++
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   | 22 +++---
 20 files changed, 338 insertions(+), 74 deletions(-)

[pulsar] 03/09: [Security] Upgrade k8s client-java to 12.0.1 (#10866)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0ee56e2af6decfe28cb6b83e3ccab8e6abecc4e0
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jun 18 09:03:30 2021 +0300

    [Security] Upgrade k8s client-java to 12.0.1 (#10866)
    
    ### Motivation
    
    - address security vulnerability CVE-2021-25738 which has been reported as https://github.com/kubernetes-client/java/issues/1698
    
    ### Modifications
    
    - upgrade kubernetes client-java to 12.0.1
    
    (cherry picked from commit 43f4e4446fb163db2659551e0084718623e85d05)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 6 +++---
 pom.xml                                          | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 949e88f..bf113ee 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -489,9 +489,9 @@ The Apache Software License, Version 2.0
   * @FreeBuilder
     - org.inferred-freebuilder-1.14.9.jar
   * Kubernetes Client
-    - io.kubernetes-client-java-12.0.0.jar
-    - io.kubernetes-client-java-api-12.0.0.jar
-    - io.kubernetes-client-java-proto-12.0.0.jar
+    - io.kubernetes-client-java-12.0.1.jar
+    - io.kubernetes-client-java-api-12.0.1.jar
+    - io.kubernetes-client-java-proto-12.0.1.jar
   * Dropwizard
     - io.dropwizard.metrics-metrics-core-3.2.5.jar
     - io.dropwizard.metrics-metrics-graphite-3.2.5.jar
diff --git a/pom.xml b/pom.xml
index f5c3ffe..207c509 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,7 +190,7 @@ flexible messaging model and an intuitive client API.</description>
     <jakarta.xml.bind.version>2.3.3</jakarta.xml.bind.version>
     <jakarta.validation.version>2.0.2</jakarta.validation.version>
     <jna.version>4.2.0</jna.version>
-    <kubernetesclient.version>12.0.0</kubernetesclient.version>
+    <kubernetesclient.version>12.0.1</kubernetesclient.version>
     <nsq-client.version>1.0</nsq-client.version>
     <cron-utils.version>9.1.3</cron-utils.version>
     <spring-context.version>5.3.1</spring-context.version>

[pulsar] 08/09: Fix NonRecoverableLedgerException when get last message ID by Reader (#10957)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d809104e9dc940a85f2d3ecf1b372bf2833ac0cc
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jun 18 16:47:08 2021 +0800

    Fix NonRecoverableLedgerException when get last message ID by Reader (#10957)
    
    * Fix NonRecoverableLedgerException when get last message ID by Reader
    
    If a topic only have non-durable subscriptions or mark delete position of all the durable subscriptions
    are reached the LAC, all the ledgers except the current ledger will be deleted.
    Since the current ledger may not have any data, occurs NonRecoverableLedgerException in the broker side.
    
    In this case, the we should return the message ID (-1, -1).
    
    (cherry picked from commit 80f42e20099abac4002e8f84bbdcaba7aa57a8bd)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 15 +++++--
 .../SystemTopicBasedTopicPoliciesService.java      | 12 ++++++
 .../broker/service/persistent/PersistentTopic.java | 46 ++++++++++++++--------
 .../broker/admin/AdminApiGetLastMessageIdTest.java | 45 ++++++++++++++++++++-
 5 files changed, 99 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 97f4a8d..4a03cd1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1127,6 +1127,7 @@ public class PersistentTopicsBase extends AdminResource {
             }
             return topic.getInternalStats(metadata).get();
         } catch (Exception e) {
+            log.error("[{}] Failed to get internal stats for {}", clientAppId(), topicName, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR,
                     (e instanceof ExecutionException) ? e.getCause().getMessage() : e.getMessage());
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c446833..8006b89 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1694,9 +1694,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
         batchSizeFuture.whenComplete((batchSize, e) -> {
             if (e != null) {
-                ctx.writeAndFlush(Commands.newError(
-                        requestId, ServerError.MetadataError,
-                        "Failed to get batch size for entry " + e.getMessage()));
+                if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
+                    // in this case, the ledgers been removed except the current ledger
+                    // and current ledger without any data
+                    ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId,
+                            -1, -1, partitionIndex, -1,
+                            markDeletePosition != null ? markDeletePosition.getLedgerId() : -1,
+                            markDeletePosition != null ? markDeletePosition.getEntryId() : -1));
+                } else {
+                    ctx.writeAndFlush(Commands.newError(
+                            requestId, ServerError.MetadataError,
+                            "Failed to get batch size for entry " + e.getMessage()));
+                }
             } else {
                 int largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 8cbbde4..807da68 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -184,6 +184,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                 readerCaches.put(namespace, readerCompletableFuture);
                 readerCompletableFuture.whenComplete((reader, ex) -> {
                     if (ex != null) {
+                        log.error("[{}] Failed to create reader on __change_events topic", namespace, ex);
                         result.completeExceptionally(ex);
                     } else {
                         initPolicesCache(reader, result);
@@ -239,19 +240,30 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
     private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, CompletableFuture<Void> future) {
         reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {
             if (ex != null) {
+                log.error("[{}] Failed to check the move events for the system topic",
+                        reader.getSystemTopic().getTopicName(), ex);
                 future.completeExceptionally(ex);
                 readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
             }
             if (hasMore) {
                 reader.readNextAsync().whenComplete((msg, e) -> {
                     if (e != null) {
+                        log.error("[{}] Failed to read event from the system topic.",
+                                reader.getSystemTopic().getTopicName(), ex);
                         future.completeExceptionally(e);
                         readerCaches.remove(reader.getSystemTopic().getTopicName().getNamespaceObject());
                     }
                     refreshTopicPoliciesCache(msg);
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Loop next event reading for system topic.",
+                                reader.getSystemTopic().getTopicName().getNamespaceObject());
+                    }
                     initPolicesCache(reader, future);
                 });
             } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Reach the end of the system topic.", reader.getSystemTopic().getTopicName());
+                }
                 future.complete(null);
                 policyCacheInitMap.computeIfPresent(
                         reader.getSystemTopic().getTopicName().getNamespaceObject(), (k, v) -> true);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index aa73813..38b3264 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
@@ -1940,22 +1941,35 @@ public class PersistentTopic extends AbstractTopic
                         ledgers.forEach(ledgerId -> {
                             CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                             getLedgerMetadataFutures.add(completableFuture);
-                            brokerService.getPulsar().getBookKeeperClient()
-                                    .getLedgerMetadata(ledgerId)
-                                    .thenAccept(metadata -> {
-                                        LedgerInfo schemaLedgerInfo = new LedgerInfo();
-                                        schemaLedgerInfo.ledgerId = metadata.getLedgerId();
-                                        schemaLedgerInfo.entries = metadata.getLastEntryId() + 1;
-                                        schemaLedgerInfo.size = metadata.getLength();
-                                        if (includeLedgerMetadata) {
-                                            info.metadata = metadata.toSafeString();
-                                        }
-                                        stats.schemaLedgers.add(schemaLedgerInfo);
-                                        completableFuture.complete(null);
-                                    }).exceptionally(e -> {
-                                completableFuture.completeExceptionally(e);
-                                return null;
-                            });
+                            CompletableFuture<LedgerMetadata> metadataFuture = null;
+                            try {
+                                metadataFuture = brokerService.getPulsar().getBookKeeperClient()
+                                    .getLedgerMetadata(ledgerId);
+                            } catch (NullPointerException e) {
+                                // related to bookkeeper issue https://github.com/apache/bookkeeper/issues/2741
+                                if (log.isDebugEnabled()) {
+                                    log.debug("{{}} Failed to get ledger metadata for the schema ledger {}",
+                                            topic, ledgerId, e);
+                                }
+                            }
+                            if (metadataFuture != null) {
+                                metadataFuture.thenAccept(metadata -> {
+                                    LedgerInfo schemaLedgerInfo = new LedgerInfo();
+                                    schemaLedgerInfo.ledgerId = metadata.getLedgerId();
+                                    schemaLedgerInfo.entries = metadata.getLastEntryId() + 1;
+                                    schemaLedgerInfo.size = metadata.getLength();
+                                    if (includeLedgerMetadata) {
+                                        info.metadata = metadata.toSafeString();
+                                    }
+                                    stats.schemaLedgers.add(schemaLedgerInfo);
+                                    completableFuture.complete(null);
+                                }).exceptionally(e -> {
+                                    completableFuture.completeExceptionally(e);
+                                    return null;
+                                });
+                            } else {
+                                completableFuture.complete(null);
+                            }
                         });
                         FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> {
                             schemaStoreLedgersFuture.complete(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
index b672dfd..533a2a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiGetLastMessageIdTest.java
@@ -22,14 +22,19 @@ import com.google.common.collect.Sets;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
@@ -43,6 +48,8 @@ import java.lang.reflect.Field;
 import java.util.Collection;
 import java.util.Date;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Mockito.doNothing;
@@ -218,4 +225,40 @@ public class AdminApiGetLastMessageIdTest extends MockedPulsarServiceBaseTest {
         Assert.assertTrue(((MessageIdImpl)id[0]).getLedgerId() > 0);
         Assert.assertEquals( 2 * numberOfMessages -1, ((MessageIdImpl)id[0]).getEntryId());
     }
+
+    /**
+     * If a topic only have non-durable subscriptions or mark delete position of all the durable subscriptions
+     * are reached the LAC, all the ledgers except the current ledger will be deleted. Since the current ledger may not
+     * have any data, so the test is to ensure the get last message ID API can work in this case.
+     *
+     * In this case, the we should return the message ID (-1, -1).
+     */
+    @Test
+    public void testGetLastMessageIdWhenTopicWithoutData() throws Exception {
+        final String topic = "persistent://prop/ns-abc/testGetLastMessageIdWhenTopicWithoutData-" + UUID.randomUUID();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send("Message - " + i);
+        }
+        // To trigger the ledger rollover
+        admin.topics().unload(topic);
+        Topic topicRef = pulsar.getBrokerService().getTopicIfExists(topic).get().get();
+        ((PersistentTopic) topicRef).getManagedLedger().trimConsumedLedgersInBackground(new CompletableFuture<>());
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertEquals(stats.ledgers.size(), 1);
+        });
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        // This will call the get last message ID api.
+        boolean hasMessage = reader.hasMessageAvailable();
+        Assert.assertFalse(hasMessage);
+        MessageId messageId = admin.topics().getLastMessageId(topic);
+        Assert.assertEquals(messageId.toString(), "-1:-1:-1");
+    }
 }

[pulsar] 01/09: [pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 887f8ec398fad0062fa04b945099afbb888b929e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Thu Jun 17 23:07:34 2021 -0700

    [pulsar-broker] Handle multiple topic creation for same topic-name in broker (#10847)
    
    ### Motivation
    
    When the broker takes a longer time to load the topic and times out before completing the topic future, then the broker keeps multiple topics opened , doesn't clean up timed out topic, fail to create replicator producer on successfully created topic with error: `repl-producer is already connected to topic`, builds replication backlog.
    
    ```
    19:16:10.107 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - Opening managed ledger myProp/global/myNs/persistent/myTopic
    :
    9:17:10.953 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl - [myProp/global/myNs/persistent/myTopic] Successfully initialize managed ledger
    :
    19:17:10.065 [pulsar-io-23-30] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.196.133.62:47278] Failed to create topic persistent://myProp/global/myNs/myTopic, producerId=382
    :
    19:17:10.954 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
    :
    19:17:10.955 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
    :
    19:17:51.532 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://myProp/global/myNs/myTopic - dedup is disabled
    :
    19:17:51.530 [pulsar-ordered-OrderedExecutor-5-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://myProp/global/myNs/myTopic][west1 -> west2] Starting open cursor for replicator
    :
    07:25:51.377 [pulsar-io-23-5] ERROR org.apache.pulsar.client.impl.ProducerImpl - [persistent://myProp/global/myNs/myTopic] [pulsar.repl.west1] Failed to create producer: Producer with name 'pulsarrepl.west1' is already connected to topic
    ```
    
    ### Modification
    - Stopped replicator for failed and timed-out topic
    - Clean up failed topic
    
    ### Result
    - Successfully create replicator producer for the topic and avoid creating replication backlog
    
    (cherry picked from commit 1447e6b1061babedc08901c44f16164bb4c4e2df)
---
 .../pulsar/broker/service/BrokerService.java       | 14 +++-
 .../pulsar/broker/service/ReplicatorTest.java      | 80 ++++++++++++++++++++++
 .../org/apache/pulsar/common/util/FutureUtil.java  | 31 +++++++++
 3 files changed, 122 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 575738a..9c9d482 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -970,7 +970,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
-        CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
+        CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.futureWithDeadline(executor());
 
         if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
             if (log.isDebugEnabled()) {
@@ -1233,8 +1233,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                                     long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
                                             - topicCreateTimeMs;
                                     pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
-                                    addTopicToStatsMaps(topicName, persistentTopic);
-                                    topicFuture.complete(Optional.of(persistentTopic));
+                                    if (topicFuture.isCompletedExceptionally()) {
+                                        log.warn("{} future is already completed with failure {}, closing the topic",
+                                                topic, FutureUtil.getException(topicFuture));
+                                        persistentTopic.stopReplProducers().whenComplete((v, exception) -> {
+                                            topics.remove(topic, topicFuture);
+                                        });
+                                    } else {
+                                        addTopicToStatsMaps(topicName, persistentTopic);
+                                        topicFuture.complete(Optional.of(persistentTopic));
+                                    }
                                 }).exceptionally((ex) -> {
                                     log.warn(
                                             "Replication or dedup check failed."
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index e5219d5..d261e85 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -34,11 +36,13 @@ import io.netty.buffer.ByteBuf;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.util.List;
+import java.util.Optional;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -52,6 +56,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
@@ -65,6 +70,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -1041,6 +1047,80 @@ public class ReplicatorTest extends ReplicatorTestBase {
         nonPersistentProducer2.close();
     }
 
+    @Test
+    public void testCleanupTopic() throws Exception {
+
+        final String cluster1 = pulsar1.getConfig().getClusterName();
+        final String cluster2 = pulsar2.getConfig().getClusterName();
+        final String namespace = "pulsar/ns-" + System.nanoTime();
+        final String topicName = "persistent://" + namespace + "/cleanTopic";
+        final String topicMlName = namespace + "/persistent/cleanTopic";
+        admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2));
+
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        long topicLoadTimeoutSeconds = 3;
+        config1.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+        config2.setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+
+        ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar1.getManagedLedgerClientFactory()
+                .getManagedLedgerFactory();
+        Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
+        ledgersField.setAccessible(true);
+        ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
+                .get(mlFactory);
+        CompletableFuture<ManagedLedgerImpl> mlFuture = new CompletableFuture<>();
+        ledgers.put(topicMlName, mlFuture);
+
+        try {
+            Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
+                    .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
+            fail("consumer should fail due to topic loading failure");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        CompletableFuture<Optional<Topic>> topicFuture = null;
+        for (int i = 0; i < 5; i++) {
+            topicFuture = pulsar1.getBrokerService().getTopics().get(topicName);
+            if (topicFuture != null) {
+                break;
+            }
+            Thread.sleep(i * 1000);
+        }
+
+        try {
+            topicFuture.get();
+            fail("topic creation should fail");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        final CompletableFuture<Optional<Topic>> timedOutTopicFuture = topicFuture;
+        // timeout topic future should be removed from cache
+        retryStrategically((test) -> pulsar1.getBrokerService().getTopic(topicName, false) != timedOutTopicFuture, 5,
+                1000);
+
+        assertNotEquals(timedOutTopicFuture, pulsar1.getBrokerService().getTopics().get(topicName));
+
+        try {
+            Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared)
+                    .subscriptionName("my-subscriber-name").subscribeAsync().get(100, TimeUnit.MILLISECONDS);
+            fail("consumer should fail due to topic loading failure");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) mlFactory.open(topicMlName + "-2");
+        mlFuture.complete(ml);
+
+        Consumer<byte[]> consumer = client1.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .subscriptionType(SubscriptionType.Shared).subscribeAsync()
+                .get(2 * topicLoadTimeoutSeconds, TimeUnit.SECONDS);
+
+        consumer.close();
+    }
     private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);
 
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 7356950..0c3a0c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.common.util;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -162,4 +164,33 @@ public class FutureUtil {
             return this;
         }
     }
+
+    public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor, Long delay,
+            TimeUnit unit, Exception exp) {
+        CompletableFuture<T> future = new CompletableFuture<T>();
+        executor.schedule(() -> {
+            if (!future.isDone()) {
+                future.completeExceptionally(exp);
+            }
+        }, delay, unit);
+        return future;
+    }
+
+    public static <T> CompletableFuture<T> futureWithDeadline(ScheduledExecutorService executor) {
+        return futureWithDeadline(executor, 60000L, TimeUnit.MILLISECONDS,
+                new TimeoutException("Future didn't finish within deadline"));
+    }
+
+    public static <T> Optional<Throwable> getException(CompletableFuture<T> future) {
+        if (future != null && future.isCompletedExceptionally()) {
+            try {
+                future.get();
+            } catch (InterruptedException e) {
+                return Optional.ofNullable(e);
+            } catch (ExecutionException e) {
+                return Optional.ofNullable(e.getCause());
+            }
+        }
+        return Optional.empty();
+    }
 }

[pulsar] 06/09: Make Metadata Tests less Flaky (#10955)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6f44d5cf3af817a60ac72a2c15378f58fe688d32
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Jun 17 20:52:38 2021 +0200

    Make Metadata Tests less Flaky (#10955)
    
    Co-authored-by: Enrico Olivelli <eo...@apache.org>
    (cherry picked from commit 21ea76c15531976326e7319e551646ab698f9bc7)
---
 pulsar-metadata/pom.xml                            |  7 ++++++-
 .../org/apache/pulsar/metadata/ZKSessionTest.java  | 23 ++++++++++++----------
 2 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index e18a75e..c61e41e 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -31,7 +31,6 @@
 
   <artifactId>pulsar-metadata</artifactId>
   <name>Pulsar Metadata</name>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
@@ -59,6 +58,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server</artifactId>
     </dependency>
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
index f565a8f..24c4bd7 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java
@@ -26,7 +26,6 @@ import static org.testng.Assert.assertTrue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 
 import lombok.Cleanup;
 
@@ -40,6 +39,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 public class ZKSessionTest extends BaseMetadataStoreTest {
@@ -61,10 +61,10 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         assertEquals(e, SessionEvent.ConnectionLost);
 
         zks.start();
-        e = sessionEvents.poll(10, TimeUnit.SECONDS);
+        e = sessionEvents.poll(20, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.Reconnected);
 
-        e = sessionEvents.poll(1, TimeUnit.SECONDS);
+        e = sessionEvents.poll(5, TimeUnit.SECONDS);
         assertNull(e);
     }
 
@@ -130,9 +130,9 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionReestablished);
 
-        Thread.sleep(2_000);
-
-        assertFalse(lock.getLockExpiredFuture().isDone());
+        Awaitility.await().untilAsserted(() -> {
+            assertFalse(lock.getLockExpiredFuture().isDone());
+        });
 
         assertTrue(store.get(path).join().isPresent());
     }
@@ -171,7 +171,10 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionLost);
 
-        assertEquals(le1.getState(), LeaderElectionState.Leading);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(le1.getState(), LeaderElectionState.Leading);
+        });
+
         les = leaderElectionEvents.poll();
         assertNull(les);
 
@@ -180,9 +183,9 @@ public class ZKSessionTest extends BaseMetadataStoreTest {
         e = sessionEvents.poll(10, TimeUnit.SECONDS);
         assertEquals(e, SessionEvent.SessionReestablished);
 
-        Thread.sleep(2_000);
-
-        assertEquals(le1.getState(), LeaderElectionState.Leading);
+        Awaitility.await().untilAsserted(() -> {
+                    assertEquals(le1.getState(), LeaderElectionState.Leading);
+        });
         les = leaderElectionEvents.poll();
         assertNull(les);
 

[pulsar] 05/09: [Security] Exclude and remove freebuilder dependency (#10869)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 46a25f04d71ef9871c77bfb012e6f9b8e800371c
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jun 18 08:38:18 2021 +0300

    [Security] Exclude and remove freebuilder dependency (#10869)
    
    ### Motivation
    
    [Freebuilder](https://github.com/inferred/FreeBuilder) is an annotation processor used in Bookkeeper's StorageClientSetting interface:
    
    https://github.com/apache/bookkeeper/blob/16e8ba772bb5cf4c7546fb559bd9d455d4e42625/stream/clients/java/base/src/main/java/org/apache/bookkeeper/clients/config/StorageClientSettings.java#L27-L33
    
    The annotation processor is only needed at compile time.
    
    The Freebuilder library gets flagged as a vulnerable library by Sonatype IQ. This causes Pulsar distribution to be flagged as vulnerable since Freebuilder is a transitive dependency.
    
    ### Additional context
    
    There's a separate issue in Bookkeeper to change the dependency to optional / compileOnly: https://github.com/apache/bookkeeper/issues/2732
    
    ### Modifications
    
    Exclude freebuilder library and replace the code that used shaded dependencies from the freebuilder library.
    
    (cherry picked from commit 406770ceae11031a0b54a39255050ebc603f4976)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |  2 --
 pom.xml                                            |  4 ++++
 pulsar-zookeeper-utils/pom.xml                     |  4 ++++
 .../ZkIsolatedBookieEnsemblePlacementPolicy.java   | 22 ++++++++--------------
 4 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 9db9943..02559b6 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -486,8 +486,6 @@ The Apache Software License, Version 2.0
     - org.apache.curator-curator-recipes-5.1.0.jar
   * Apache Yetus
     - org.apache.yetus-audience-annotations-0.5.0.jar
-  * @FreeBuilder
-    - org.inferred-freebuilder-1.14.9.jar
   * Kubernetes Client
     - io.kubernetes-client-java-12.0.1.jar
     - io.kubernetes-client-java-api-12.0.1.jar
diff --git a/pom.xml b/pom.xml
index 9d7f186..9087dc2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -469,6 +469,10 @@ flexible messaging model and an intuitive client API.</description>
             <groupId>org.codehaus.jackson</groupId>
             <artifactId>jackson-mapper-asl</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>org.inferred</groupId>
+            <artifactId>freebuilder</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
 
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 0f592ab..30a751e 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -64,6 +64,10 @@
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.inferred</groupId>
+          <artifactId>freebuilder</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
 
diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
index e7f393a..9320296 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.zookeeper;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.util.HashedWheelTimer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -27,15 +30,14 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
-import org.apache.bookkeeper.common.util.JsonUtil;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.configuration.Configuration;
@@ -44,22 +46,14 @@ import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.policies.data.BookieInfo;
 import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
+import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
-import org.inferred.freebuilder.shaded.com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import io.netty.util.HashedWheelTimer;
-import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.proto.BookieAddressResolver;
-
-import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
-
 public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy
         implements Deserializer<BookiesRackConfiguration> {
     private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);
@@ -210,10 +204,10 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
             String primaryIsolationGroupString = castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
             String secondaryIsolationGroupString = castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
             if (!primaryIsolationGroupString.isEmpty()) {
-                pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
+                pair.setLeft(new HashSet(Arrays.asList(primaryIsolationGroupString.split(","))));
             }
             if (!secondaryIsolationGroupString.isEmpty()) {
-                pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
+                pair.setRight(new HashSet(Arrays.asList(secondaryIsolationGroupString.split(","))));
             }
         }
         return pair;

[pulsar] 09/09: fix non-persistent topic get partitioned metadata error on discovery (#10806)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 50da9b228695fcbe19f746152e45da1c67cc5b19
Author: Aloys <lo...@gmail.com>
AuthorDate: Fri Jun 18 14:09:06 2021 +0800

    fix non-persistent topic get partitioned metadata error on discovery (#10806)
    
    Fixes #10443
    
    ### Motivation
    fix non-persistent topic get partitioned metadata error if using discovery
    
    
    
    
    (cherry picked from commit 859922942759aaa539fe7b0951a614bb75c71ea8)
---
 .../discovery/service/BrokerDiscoveryProvider.java  |  2 +-
 .../discovery/service/BaseDiscoveryTestSetup.java   |  7 +++++++
 .../discovery/service/DiscoveryServiceTest.java     | 21 +++++++++++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
index 7f3eb6c..9c9a095 100644
--- a/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
+++ b/pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/BrokerDiscoveryProvider.java
@@ -106,7 +106,7 @@ public class BrokerDiscoveryProvider implements Closeable {
         try {
             checkAuthorization(service, topicName, role, authenticationData);
             final String path = path(PARTITIONED_TOPIC_PATH_ZNODE,
-                    topicName.getNamespaceObject().toString(), "persistent", topicName.getEncodedLocalName());
+                    topicName.getNamespaceObject().toString(), topicName.getDomain().value(), topicName.getEncodedLocalName());
             // gets the number of partitions from the zk cache
             pulsarResources.getNamespaceResources().getPartitionedTopicResources().getAsync(path)
                     .thenAccept(metadata -> {
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
index 8a0bd00..70fc26f 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/BaseDiscoveryTestSetup.java
@@ -80,4 +80,11 @@ public class BaseDiscoveryTestSetup {
                 && path.equals("/admin/partitioned-topics/test/local/ns/persistent/my-topic-2");
         });
     }
+
+    protected void simulateStoreErrorForNonPersistentTopic(String string, Code sessionexpired) {
+        mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
+            return op == MockZooKeeper.Op.GET
+                    && path.equals("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2");
+        });
+    }
 }
diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index 73b3c41..802879f 100644
--- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -114,6 +114,27 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
         }
     }
 
+    @Test
+    public void testGetPartitionsMetadataForNonPersistentTopic() throws Exception {
+        TopicName topic1 = TopicName.get("non-persistent://test/local/ns/my-topic-1");
+
+        PartitionedTopicMetadata m = service.getDiscoveryProvider().getPartitionedTopicMetadata(service, topic1, "role", null)
+                .get();
+        assertEquals(m.partitions, 0);
+
+        // Simulate ZK error
+        simulateStoreErrorForNonPersistentTopic("/admin/partitioned-topics/test/local/ns/non-persistent/my-topic-2", Code.SESSIONEXPIRED);
+        TopicName topic2 = TopicName.get("non-persistent://test/local/ns/my-topic-2");
+        CompletableFuture<PartitionedTopicMetadata> future = service.getDiscoveryProvider()
+                .getPartitionedTopicMetadata(service, topic2, "role", null);
+        try {
+            future.get();
+            fail("Partition metadata lookup should have failed");
+        } catch (ExecutionException e) {
+            assertEquals(e.getCause().getClass(), MetadataStoreException.class);
+        }
+    }
+
     /**
      * It verifies: client connects to Discovery-service and receives discovery response successfully.
      *

[pulsar] 02/09: [Security] Upgrade caffeine to 2.9.1 (#10865)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e7f270125f5cdc30394ea6e64bf6801b47194dc0
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jun 18 15:39:16 2021 +0300

    [Security] Upgrade caffeine to 2.9.1 (#10865)
    
    
    (cherry picked from commit acf41498b4df713d26f81084a1e2243f4f95cc94)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 2 +-
 pom.xml                                          | 2 +-
 pulsar-sql/presto-distribution/LICENSE           | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index d9874a0..949e88f 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -320,7 +320,7 @@ The Apache Software License, Version 2.0
      - com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.12.3.jar
      - com.fasterxml.jackson.module-jackson-module-jaxb-annotations-2.12.3.jar
      - com.fasterxml.jackson.module-jackson-module-jsonSchema-2.12.3.jar
- * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.6.2.jar
+ * Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
  * Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
  * Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-1.17.0.jar
  * Bitbucket -- org.bitbucket.b_c-jose4j-0.7.6.jar
diff --git a/pom.xml b/pom.xml
index 6ba40dc..f5c3ffe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -177,7 +177,7 @@ flexible messaging model and an intuitive client API.</description>
     <log4j.version>1.2.17</log4j.version>
     <hdrHistogram.version>2.1.9</hdrHistogram.version>
     <javax.servlet-api>3.1.0</javax.servlet-api>
-    <caffeine.version>2.6.2</caffeine.version>
+    <caffeine.version>2.9.1</caffeine.version>
     <java-semver.version>0.9.0</java-semver.version>
     <hppc.version>0.7.3</hppc.version>
     <spark-streaming_2.10.version>2.1.0</spark-streaming_2.10.version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 3f0bc09..d357995 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -362,7 +362,7 @@ The Apache Software License, Version 2.0
     - avro-1.10.2.jar
     - avro-protobuf-1.10.2.jar
   * Caffeine
-    - caffeine-2.6.2.jar
+    - caffeine-2.9.1.jar
   * Javax
     - javax.inject-1.jar
     - javax.servlet-api-3.1.0.jar

[pulsar] 04/09: [Security] Upgrade bouncycastle version to 1.69 (#10867)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d14d59368d8c19a9f126a27ab512ec87144648b1
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jun 18 12:26:29 2021 +0300

    [Security] Upgrade bouncycastle version to 1.69 (#10867)
    
    
    (cherry picked from commit 9af9361506a67037a9b339279cafe0b72a70c14a)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 7 ++++---
 pom.xml                                          | 2 +-
 pulsar-sql/presto-distribution/LICENSE           | 7 ++++---
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index bf113ee..9db9943 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -590,9 +590,10 @@ Creative Commons Attribution License
 
 Bouncy Castle License
  * Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
-    - org.bouncycastle-bcpkix-jdk15on-1.68.jar
-    - org.bouncycastle-bcprov-ext-jdk15on-1.68.jar
-    - org.bouncycastle-bcprov-jdk15on-1.68.jar
+    - org.bouncycastle-bcpkix-jdk15on-1.69.jar
+    - org.bouncycastle-bcprov-ext-jdk15on-1.69.jar
+    - org.bouncycastle-bcprov-jdk15on-1.69.jar
+    - org.bouncycastle-bcutil-jdk15on-1.69.jar
 
 ------------------------
 
diff --git a/pom.xml b/pom.xml
index 207c509..9d7f186 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,7 +120,7 @@ flexible messaging model and an intuitive client API.</description>
     <slf4j.version>1.7.25</slf4j.version>
     <commons.collections.version>3.2.2</commons.collections.version>
     <log4j2.version>2.14.0</log4j2.version>
-    <bouncycastle.version>1.68</bouncycastle.version>
+    <bouncycastle.version>1.69</bouncycastle.version>
     <bouncycastlefips.version>1.0.2</bouncycastlefips.version>
     <jackson.version>2.12.3</jackson.version>
     <jackson.databind.version>2.12.3</jackson.databind.version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index d357995..e61b8c9 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -556,6 +556,7 @@ Creative Commons Attribution License
 
 Bouncy Castle License
  * Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
--    - bcpkix-jdk15on-1.68.jar
--    - bcprov-ext-jdk15on-1.68.jar
--    - bcprov-jdk15on-1.68.jar
+   - bcpkix-jdk15on-1.69.jar
+   - bcprov-ext-jdk15on-1.69.jar
+   - bcprov-jdk15on-1.69.jar
+   - bcutil-jdk15on-1.69.jar
\ No newline at end of file

[pulsar] 07/09: [C++] Fix Windows build issues about static library (#10956)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 77879578f708e4648ecdf1cb734ef6e459a38275
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Jun 18 13:31:51 2021 +0800

    [C++] Fix Windows build issues about static library (#10956)
    
    ### Motivation
    
    On Windows Platform, .lib files can be either **static libraries** or **import libraries**. For example, when we build an executable with dynamic linking, we need to link to `xxx.lib` during compilation and `xxx.dll` for running. The `xxx.lib` here is not the static library but the import library that is associated with the dynamic library.
    
    Currently if C++ library is built on Windows using release mode, following files wil be generated under `<BUILD_DIR>\lib\Release`:
    - pulsar.dll
    - pulsar.lib
    
    However, the associated import library (`pulsar.lib`) of `pulsar.dll` is overwritten by the static library because they share the same name. So we must set `-DBUILD_STATIC_LIB=OFF` option to disable building static libraries. However, it will fail on Windows:
    
    ```
    CMake Error at lib/CMakeLists.txt:100 (target_include_directories):
      Cannot specify include directories for target "pulsarStatic" which is not
      built by this project.
    ```
    
    ### Modifications
    
    - For MSVC compiler on Windows, use a different name for the static library to avoid overwriting the import library.
    - Fix the CMake failure when `-DBUILD_STATIC_LIB=OFF` is configured.
    - Add CI to build C++ client without static library on Windows.
    
    After this change, if C++ library is built on Windows using release mode, following files wil be generated under `<BUILD_DIR>\lib\Release`:
    - pulsar.dll: the dynamic library
    - pulsar.lib: the import library associated with pulsar.dll
    - pulsar-static.lib: the static library
    
    (cherry picked from commit b827f6e3bb12a54fbb7ddaa07317540d024914e5)
---
 .github/workflows/ci-cpp-build-windows.yaml | 29 +++++++++++++++++++++++++++--
 pulsar-client-cpp/lib/CMakeLists.txt        | 17 ++++++++++-------
 2 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/.github/workflows/ci-cpp-build-windows.yaml b/.github/workflows/ci-cpp-build-windows.yaml
index a044949..6384968 100644
--- a/.github/workflows/ci-cpp-build-windows.yaml
+++ b/.github/workflows/ci-cpp-build-windows.yaml
@@ -73,7 +73,7 @@ jobs:
         run: |
           cd pulsar-client-cpp && vcpkg install --triplet ${{ matrix.triplet }}
 
-      - name: Configure
+      - name: Configure (default)
         if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
         shell: bash
         run: |
@@ -95,4 +95,29 @@ jobs:
           if [ "$RUNNER_OS" == "Windows" ]; then
             cd pulsar-client-cpp && \
             cmake --build ./build
-          fi
\ No newline at end of file
+          fi
+
+      - name: Configure (dynamic library only)
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        shell: bash
+        run: |
+          if [ "$RUNNER_OS" == "Windows" ]; then
+            cd pulsar-client-cpp && \
+            cmake \
+              -B ./build-1 \
+              -G "${{ matrix.generator }}" ${{ matrix.arch }} \
+              -DBUILD_PYTHON_WRAPPER=OFF -DBUILD_TESTS=OFF \
+              -DVCPKG_TRIPLET=${{ matrix.triplet }} \
+              -DCMAKE_BUILD_TYPE=Release \
+              -DBUILD_STATIC_LIB=OFF \
+              -S .
+          fi
+
+      - name: Compile
+        if: ${{ steps.check_changes.outputs.docs_only != 'true' }}
+        shell: bash
+        run: |
+          if [ "$RUNNER_OS" == "Windows" ]; then
+            cd pulsar-client-cpp && \
+            cmake --build ./build-1
+          fi
diff --git a/pulsar-client-cpp/lib/CMakeLists.txt b/pulsar-client-cpp/lib/CMakeLists.txt
index a96e842..2e540b2 100644
--- a/pulsar-client-cpp/lib/CMakeLists.txt
+++ b/pulsar-client-cpp/lib/CMakeLists.txt
@@ -63,6 +63,10 @@ if (BUILD_DYNAMIC_LIB)
     set_property(TARGET pulsarShared PROPERTY OUTPUT_NAME ${LIB_NAME_SHARED})
     set_property(TARGET pulsarShared PROPERTY VERSION ${LIBRARY_VERSION})
     target_link_libraries(pulsarShared ${COMMON_LIBS} ${CMAKE_DL_LIBS})
+    if (MSVC)
+        target_include_directories(pulsarShared PRIVATE ${dlfcn-win32_INCLUDE_DIRS})
+        target_link_options(pulsarShared PRIVATE $<$<CONFIG:DEBUG>:/NODEFAULTLIB:MSVCRT>)
+    endif()
 endif()
 
 
@@ -87,17 +91,16 @@ endif()
 
 if (BUILD_STATIC_LIB)
     add_library(pulsarStatic STATIC $<TARGET_OBJECTS:PULSAR_OBJECT_LIB>)
-    set_property(TARGET pulsarStatic PROPERTY OUTPUT_NAME ${LIB_NAME})
+    if (MSVC)
+        set_property(TARGET pulsarStatic PROPERTY OUTPUT_NAME "${LIB_NAME}-static")
+        target_include_directories(pulsarStatic PRIVATE ${dlfcn-win32_INCLUDE_DIRS})
+    else ()
+        set_property(TARGET pulsarStatic PROPERTY OUTPUT_NAME ${LIB_NAME})
+    endif()
     set_property(TARGET pulsarStatic PROPERTY VERSION ${LIBRARY_VERSION})
     target_compile_definitions(pulsarStatic PRIVATE PULSAR_STATIC)
 endif()
 
-if (MSVC)
-    target_include_directories(pulsarStatic PRIVATE ${dlfcn-win32_INCLUDE_DIRS})
-    target_include_directories(pulsarShared PRIVATE ${dlfcn-win32_INCLUDE_DIRS})
-    target_link_options(pulsarShared PRIVATE $<$<CONFIG:DEBUG>:/NODEFAULTLIB:MSVCRT>)
-endif()
-
 # When linking statically, install a libpulsar.a that contains all the
 # required dependencies except ssl
 if (LINK_STATIC AND BUILD_STATIC_LIB)