You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/05 12:21:31 UTC

[pulsar] branch branch-2.8 updated (69a4dbb -> 916cb3b)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 69a4dbb  [Branch-2.8]Fix cherry-pick code style (#12627)
     new 9242aa1  Fix additional servlets nar might extract to null   directory (#12585)
     new 9bbcca2  [ISSUE-12291][Client]  'StartMessageId' and 'RollbackDuration' not working in MultiTopicsReader for non-partitioned topics (#12308)
     new ea40033  [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)
     new 916cb3b  [Broker] Fix prefix setting in JWT authn and avoid multi calls for the getProperty (#12132)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   3 +-
 .../mledger/impl/NonDurableCursorImpl.java         |  10 ++
 .../AuthenticationProviderToken.java               |  43 ++++-----
 .../web/plugin/servlet/AdditionalServlets.java     |  21 +++--
 .../AuthenticationProviderTokenTest.java           |  41 +++++++++
 .../AbstractDispatcherSingleActiveConsumer.java    |  10 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |   2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   4 +-
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 102 ++++++++++++++++++++-
 .../pulsar/compaction/CompactedTopicTest.java      |  73 +++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   8 +-
 12 files changed, 274 insertions(+), 45 deletions(-)

[pulsar] 02/04: [ISSUE-12291][Client] 'StartMessageId' and 'RollbackDuration' not working in MultiTopicsReader for non-partitioned topics (#12308)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9bbcca272136f6d44ac9c8fd0ddebbf30e6333a8
Author: Jason918 <ja...@qq.com>
AuthorDate: Thu Nov 4 23:18:10 2021 +0800

    [ISSUE-12291][Client]  'StartMessageId' and 'RollbackDuration' not working in MultiTopicsReader for non-partitioned topics (#12308)
    
    Fixes #12291
    
    ### Motivation
    
    Bug fix. 'StartMessageId' and 'RollbackDuration' is not working in MultiTopicsReader for non-partitioned topics.
    
    ### Modifications
    
    This fix is quite simple. Just add `startMessageId` and `startMessageRollbackDurationInSec` when creating underlying consumer with `ConsumerImpl.newConsumerImpl`
    
    (cherry picked from commit cb48152254b8c16596e7251ef9a7229d918d2e90)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 102 ++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   2 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   8 +-
 3 files changed, 106 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 31a426e..a8a6ced 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -48,7 +48,6 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -378,6 +377,107 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
         Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
     }
 
+    @Test(timeOut = 20000)
+    public void testMultiNonPartitionedTopicWithStartMessageId() throws Exception {
+        final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID();
+        final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+        List<String> topics = Arrays.asList(topic1, topic2);
+        PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+
+        // create producer and send msg
+        List<Producer<String>> producerList = new ArrayList<>();
+        for (String topicName : topics) {
+            producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+        }
+        int msgNum = 10;
+        Set<String> messages = new HashSet<>();
+        for (int i = 0; i < producerList.size(); i++) {
+            Producer<String> producer = producerList.get(i);
+            for (int j = 0; j < msgNum; j++) {
+                String msg = i + "msg" + j;
+                producer.send(msg);
+                messages.add(msg);
+            }
+        }
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .startMessageId(MessageId.earliest)
+                .topics(topics).readerName("my-reader").create();
+        // receive messages
+        while (reader.hasMessageAvailable()) {
+            messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue());
+        }
+        assertEquals(messages.size(), 0);
+        assertEquals(client.consumersCount(), 1);
+        // clean up
+        for (Producer<String> producer : producerList) {
+            producer.close();
+        }
+        reader.close();
+        Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+    }
+
+    @Test(timeOut = 20000)
+    public void testMultiNonPartitionedTopicWithRollbackDuration() throws Exception {
+        final String topic1 = "persistent://my-property/my-ns/topic1" + UUID.randomUUID();
+        final String topic2 = "persistent://my-property/my-ns/topic2" + UUID.randomUUID();
+        List<String> topics = Arrays.asList(topic1, topic2);
+        PulsarClientImpl client = (PulsarClientImpl) pulsarClient;
+
+        // create producer and send msg
+        List<Producer<String>> producerList = new ArrayList<>();
+        for (String topicName : topics) {
+            producerList.add(pulsarClient.newProducer(Schema.STRING).topic(topicName).create());
+        }
+        int totalMsg = 10;
+        Set<String> messages = new HashSet<>();
+        long oldMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(5); // 5 hours old
+        long newMsgPublishTime = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1); // 5 hours old
+        for (int i = 0; i < producerList.size(); i++) {
+            Producer<String> producer = producerList.get(i);
+            // (1) Publish 10 messages with publish-time 5 HOUR back
+            for (int j = 0; j < totalMsg; j++) {
+                TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage()
+                        .value(i + "-old-msg-" + j);
+                msg.getMetadataBuilder()
+                        .setPublishTime(oldMsgPublishTime)
+                        .setProducerName(producer.getProducerName())
+                        .setReplicatedFrom("us-west1");
+                msg.send();
+                messages.add(msg.getMessage().getValue());
+            }
+            // (2) Publish 10 messages with publish-time 1 HOUR back
+            for (int j = 0; j < totalMsg; j++) {
+                TypedMessageBuilderImpl<String> msg = (TypedMessageBuilderImpl<String>) producer.newMessage()
+                        .value(i + "-new-msg-" + j);
+                msg.getMetadataBuilder()
+                        .setPublishTime(newMsgPublishTime)
+                        .setProducerName(producer.getProducerName())
+                        .setReplicatedFrom("us-west1");
+                msg.send();
+                messages.add(msg.getMessage().getValue());
+            }
+        }
+
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .startMessageFromRollbackDuration(2, TimeUnit.HOURS)
+                .topics(topics).readerName("my-reader").create();
+        // receive messages
+        while (reader.hasMessageAvailable()) {
+            messages.remove(reader.readNext(5, TimeUnit.SECONDS).getValue());
+        }
+        assertEquals(messages.size(), 2 * totalMsg);
+        for (String message : messages) {
+            assertTrue(message.contains("old-msg"));
+        }
+        assertEquals(client.consumersCount(), 1);
+        // clean up
+        for (Producer<String> producer : producerList) {
+            producer.close();
+        }
+        reader.close();
+        Awaitility.await().untilAsserted(() -> assertEquals(client.consumersCount(), 0));
+    }
+
     @Test(timeOut = 10000)
     public void testKeyHashRangeReader() throws Exception {
         final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ce4711d..e63effc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1880,7 +1880,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         if (lastDequeuedMessageId == MessageId.earliest) {
             // if we are starting from latest, we should seek to the actual last message first.
             // allow the last one to be read when read head inclusively.
-            if (startMessageId.equals(MessageId.latest)) {
+            if (MessageId.latest.equals(startMessageId)) {
 
                 CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync();
                 // if the consumer is configured to read inclusive then we need to seek to the last message
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 28f4548..0de4bcc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1001,8 +1001,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 } else {
                     ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                             client.externalExecutorProvider(), -1,
-                            true, subFuture, null, schema, interceptors,
-                            createIfDoesNotExist);
+                            true, subFuture, startMessageId, schema, interceptors,
+                            createIfDoesNotExist, startMessageRollbackDurationInSec);
 
                     synchronized (pauseMutex) {
                         if (paused) {
@@ -1297,8 +1297,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
                                 client, partitionName, configurationData,
                                 client.externalExecutorProvider(),
-                                partitionIndex, true, subFuture, null, schema, interceptors,
-                                true /* createTopicIfDoesNotExist */);
+                                partitionIndex, true, subFuture, startMessageId, schema, interceptors,
+                                true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec);
                         synchronized (pauseMutex) {
                             if (paused) {
                                 newConsumer.pause();

[pulsar] 01/04: Fix additional servlets nar might extract to null directory (#12585)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9242aa18465e8187e28ac3b40d584336db1b5199
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Thu Nov 4 21:01:24 2021 +0800

    Fix additional servlets nar might extract to null   directory (#12585)
    
    ### Motivation
    The additional servlets use NAR package to implantation plugin mechanism, it need extract to specific directory.
    
    However, the `narExtractionDirectory` is from `Properties`, but the properties has only the configuration in the
    configuration file. The default value of `narExtractionDirectory ` in `ServiceConfiguration` can't be use.
    
    ### Modifications
    When `narExtractionDirectory ` configuration is not set, use `NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR` as default directory.
    
    (cherry picked from commit 9ecd613c0fdaf7f5306fc7f633a4d12218a7a4d2)
---
 .../web/plugin/servlet/AdditionalServlets.java      | 21 ++++++++++++---------
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
index 2451cf5..080e1c7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/plugin/servlet/AdditionalServlets.java
@@ -21,13 +21,12 @@ package org.apache.pulsar.broker.web.plugin.servlet;
 import com.google.common.collect.ImmutableMap;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
+import org.apache.pulsar.common.nar.NarClassLoader;
 
 /**
  * A collection of loaded additional servlets.
@@ -71,18 +70,23 @@ public class AdditionalServlets implements AutoCloseable {
         if (additionalServlets == null) {
             additionalServlets = conf.getProperties().getProperty(PROXY_ADDITIONAL_SERVLETS);
         }
+
+        String narExtractionDirectory = conf.getProperties().getProperty(NAR_EXTRACTION_DIRECTORY);
+        if(narExtractionDirectory == null) {
+            narExtractionDirectory = NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+        }
+
         if (additionalServletDirectory == null || additionalServlets == null) {
             return null;
         }
 
         AdditionalServletDefinitions definitions =
                 AdditionalServletUtils.searchForServlets(additionalServletDirectory
-                        , null);
+                        , narExtractionDirectory);
         ImmutableMap.Builder<String, AdditionalServletWithClassLoader> builder = ImmutableMap.builder();
 
-        List<String> additionalServletsList = Arrays.asList(additionalServlets.split(","));
-        additionalServletsList.forEach(servletName -> {
-
+        String[] additionalServletsList = additionalServlets.split(",");
+        for (String servletName : additionalServletsList) {
             AdditionalServletMetadata definition = definitions.servlets().get(servletName);
             if (null == definition) {
                 throw new RuntimeException("No additional servlet is found for name `" + servletName
@@ -91,8 +95,7 @@ public class AdditionalServlets implements AutoCloseable {
 
             AdditionalServletWithClassLoader servletWithClassLoader;
             try {
-                servletWithClassLoader = AdditionalServletUtils.load(definition,
-                        conf.getProperties().getProperty(NAR_EXTRACTION_DIRECTORY));
+                servletWithClassLoader = AdditionalServletUtils.load(definition, narExtractionDirectory);
                 if (servletWithClassLoader != null) {
                     builder.put(servletName, servletWithClassLoader);
                 }
@@ -101,7 +104,7 @@ public class AdditionalServlets implements AutoCloseable {
                 log.error("Failed to load the additional servlet for name `" + servletName + "`", e);
                 throw new RuntimeException("Failed to load the additional servlet for name `" + servletName + "`");
             }
-        });
+        }
 
         Map<String, AdditionalServletWithClassLoader> servlets = builder.build();
         if (servlets != null && !servlets.isEmpty()) {

[pulsar] 03/04: [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea40033a5e456b147496269892c3cd77b0236aed
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Nov 5 00:55:59 2021 +0800

    [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction (#12602)
    
    * [Compaction] Do not move the non-durable cursor position when trimming ledgers while topic with compaction.
    
    For the non-durable cursor, the ledgers trimming task will cause skip the removed ledgers
    to avoid readers introduced backlogs and make sure the data can be removed if over the retention,
    more details to see #6787.
    
    But for a topic which enabled compaction, this will lead to the reader skips the compacted data.
    The new added test can illustrate this problem well. For reading compacted data, reading a message ID
    that earlier that the first message ID of the original data is a normal behavior, so we should not
    move forward the cursor which will read the compacted data.
    
    * Fix checkstyle.
    
    * Fix tests.
    
    * Fix tests.
    
    (cherry picked from commit a6b1b34a5c028b74bd44c5b8f32b42752b6cec14)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../mledger/impl/NonDurableCursorImpl.java         | 10 +++
 .../AbstractDispatcherSingleActiveConsumer.java    | 10 ++-
 ...onPersistentDispatcherSingleActiveConsumer.java |  2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  4 +-
 .../pulsar/compaction/CompactedTopicTest.java      | 73 ++++++++++++++++++++++
 6 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 126acdb..fa62ebe 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2543,7 +2543,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             // move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
             // to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
             if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
-                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
+                    && highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
+                    && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
                 cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
                     @Override
                     public void markDeleteComplete(Object ctx) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 15b1f04..0f7ffe41 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
+    private volatile boolean readCompacted;
+
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
                          PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
         super(bookkeeper, config, ledger, cursorName);
@@ -116,6 +118,14 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
         callback.deleteCursorComplete(ctx);
     }
 
+    public void setReadCompacted(boolean readCompacted) {
+        this.readCompacted = readCompacted;
+    }
+
+    public boolean isReadCompacted() {
+        return readCompacted;
+    }
+
     @Override
     public synchronized String toString() {
         return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 690a598..4c7ea45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,6 +26,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -45,7 +47,7 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     protected boolean isKeyHashRangeFiltered = false;
     protected CompletableFuture<Void> closeFuture = null;
     protected final int partitionIndex;
-
+    protected final ManagedCursor cursor;
     // This dispatcher supports both the Exclusive and Failover subscription types
     protected final SubType subscriptionType;
 
@@ -59,12 +61,13 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
                                                   String topicName, Subscription subscription,
-                                                  ServiceConfiguration serviceConfig) {
+                                                  ServiceConfiguration serviceConfig, ManagedCursor cursor) {
         super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
         this.subscriptionType = subscriptionType;
+        this.cursor = cursor;
         ACTIVE_CONSUMER_UPDATER.set(this, null);
     }
 
@@ -178,6 +181,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
                 consumer.notifyActiveConsumerChange(currentActiveConsumer);
             }
         }
+        if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) {
+            ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
+        }
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 6094ab7..5cdbff1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -44,7 +44,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
                                                        NonPersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), null);
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 04b8f5a..1f6cbec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -58,7 +58,6 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         implements Dispatcher, ReadEntriesCallback {
 
     protected final PersistentTopic topic;
-    protected final ManagedCursor cursor;
     protected final String name;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
@@ -73,11 +72,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
                                                     PersistentTopic topic, Subscription subscription) {
         super(subscriptionType, partitionIndex, topic.getName(), subscription,
-                topic.getBrokerService().pulsar().getConfiguration());
+                topic.getBrokerService().pulsar().getConfiguration(), cursor);
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
-        this.cursor = cursor;
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
             TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 608d99b..cbe7372 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -509,4 +509,77 @@ public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
         reader.close();
         producer.close();
     }
+
+    @Test
+    public void testReadCompactedDataWhenLedgerRolloverKickIn() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-" +
+                UUID.randomUUID();
+        final int numMessages = 2000;
+        final int keys = 200;
+        final String msg = "Test";
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .maxPendingMessages(numMessages)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i % keys + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys) + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        // Make sure we have more than 1 original ledgers
+        admin.topics().unload(topic);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(admin.topics().getInternalStats(topic).ledgers.size(), 2);
+        });
+
+        // Start a new reader to reading messages
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .receiverQueueSize(10)
+                .create();
+
+        // Send more 200 keys
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key((i % keys + keys * 2) + "").value(msg).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, keys * 3);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+        });
+
+        // The reader should read all 600 keys
+        int received = 0;
+        while (reader.hasMessageAvailable()) {
+            System.out.println(reader.readNext().getKey());
+            received++;
+        }
+        Assert.assertEquals(received, keys * 3);
+    }
 }

[pulsar] 04/04: [Broker] Fix prefix setting in JWT authn and avoid multi calls for the getProperty (#12132)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 916cb3b9c1e00b90fd2fab12e6572654edd6e845
Author: Zike Yang <ar...@armail.top>
AuthorDate: Fri Nov 5 14:34:00 2021 +0800

    [Broker] Fix prefix setting in JWT authn and avoid multi calls for the getProperty (#12132)
    
    Motivation
    Currently, the setting prefix for JWT authentication does not work because the code does not specify the property name for the token setting prefix.
    
    Modifications
    Add token setting prefix property name: tokenSettingPrefix.
    Avoid multi calls for the getProperty in JWT auth.
    Verifying this change
    This change is already covered by the existing test, such as testTokenSettingPrefix.
    
    (cherry picked from commit b9a250dd0c80845da703bfbd0286044251187131)
---
 .../AuthenticationProviderToken.java               | 43 ++++++++++------------
 .../AuthenticationProviderTokenTest.java           | 41 +++++++++++++++++++++
 2 files changed, 61 insertions(+), 23 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index 21bda4c..3801aec 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -54,7 +54,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
     static final String HTTP_HEADER_VALUE_PREFIX = "Bearer ";
 
     // When symmetric key is configured
-    static final String CONF_TOKEN_SETTING_PREFIX = "";
+    static final String CONF_TOKEN_SETTING_PREFIX = "tokenSettingPrefix";
 
     // When symmetric key is configured
     static final String CONF_TOKEN_SECRET_KEY = "tokenSecretKey";
@@ -253,15 +253,13 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
      * Try to get the validation key for tokens from several possible config options.
      */
     private Key getValidationKey(ServiceConfiguration conf) throws IOException {
-        if (conf.getProperty(confTokenSecretKeySettingName) != null
-                && StringUtils.isNotBlank((String) conf.getProperty(confTokenSecretKeySettingName))) {
-            final String validationKeyConfig = (String) conf.getProperty(confTokenSecretKeySettingName);
-            final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(validationKeyConfig);
+        String tokenSecretKey = (String) conf.getProperty(confTokenSecretKeySettingName);
+        String tokenPublicKey = (String) conf.getProperty(confTokenPublicKeySettingName);
+        if (StringUtils.isNotBlank(tokenSecretKey)) {
+            final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(tokenSecretKey);
             return AuthTokenUtils.decodeSecretKey(validationKey);
-        } else if (conf.getProperty(confTokenPublicKeySettingName) != null
-                && StringUtils.isNotBlank((String) conf.getProperty(confTokenPublicKeySettingName))) {
-            final String validationKeyConfig = (String) conf.getProperty(confTokenPublicKeySettingName);
-            final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(validationKeyConfig);
+        } else if (StringUtils.isNotBlank(tokenPublicKey)) {
+            final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(tokenPublicKey);
             return AuthTokenUtils.decodePublicKey(validationKey, publicKeyAlg);
         } else {
             throw new IOException("No secret key was provided for token authentication");
@@ -269,22 +267,21 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
     }
 
     private String getTokenRoleClaim(ServiceConfiguration conf) throws IOException {
-        if (conf.getProperty(confTokenAuthClaimSettingName) != null
-                && StringUtils.isNotBlank((String) conf.getProperty(confTokenAuthClaimSettingName))) {
-            return (String) conf.getProperty(confTokenAuthClaimSettingName);
+        String tokenAuthClaim = (String) conf.getProperty(confTokenAuthClaimSettingName);
+        if (StringUtils.isNotBlank(tokenAuthClaim)) {
+            return tokenAuthClaim;
         } else {
             return Claims.SUBJECT;
         }
     }
 
     private SignatureAlgorithm getPublicKeyAlgType(ServiceConfiguration conf) throws IllegalArgumentException {
-        if (conf.getProperty(confTokenPublicAlgSettingName) != null
-                && StringUtils.isNotBlank((String) conf.getProperty(confTokenPublicAlgSettingName))) {
-            String alg = (String) conf.getProperty(confTokenPublicAlgSettingName);
+        String tokenPublicAlg = (String) conf.getProperty(confTokenPublicAlgSettingName);
+        if (StringUtils.isNotBlank(tokenPublicAlg)) {
             try {
-                return SignatureAlgorithm.forName(alg);
+                return SignatureAlgorithm.forName(tokenPublicAlg);
             } catch (SignatureException ex) {
-                throw new IllegalArgumentException("invalid algorithm provided " + alg, ex);
+                throw new IllegalArgumentException("invalid algorithm provided " + tokenPublicAlg, ex);
             }
         } else {
             return SignatureAlgorithm.RS256;
@@ -293,9 +290,9 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
 
     // get Token Audience Claim from configuration, if not configured return null.
     private String getTokenAudienceClaim(ServiceConfiguration conf) throws IllegalArgumentException {
-        if (conf.getProperty(confTokenAudienceClaimSettingName) != null
-            && StringUtils.isNotBlank((String) conf.getProperty(confTokenAudienceClaimSettingName))) {
-            return (String) conf.getProperty(confTokenAudienceClaimSettingName);
+        String tokenAudienceClaim = (String) conf.getProperty(confTokenAudienceClaimSettingName);
+        if (StringUtils.isNotBlank(tokenAudienceClaim)) {
+            return tokenAudienceClaim;
         } else {
             return null;
         }
@@ -303,9 +300,9 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
 
     // get Token Audience that stands for this broker from configuration, if not configured return null.
     private String getTokenAudience(ServiceConfiguration conf) throws IllegalArgumentException {
-        if (conf.getProperty(confTokenAudienceSettingName) != null
-            && StringUtils.isNotBlank((String) conf.getProperty(confTokenAudienceSettingName))) {
-            return (String) conf.getProperty(confTokenAudienceSettingName);
+        String tokenAudience = (String) conf.getProperty(confTokenAudienceSettingName);
+        if (StringUtils.isNotBlank(tokenAudience)) {
+            return tokenAudience;
         } else {
             return null;
         }
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index b05ad4c..3b58cf9 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.authentication;
 
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -55,6 +56,7 @@ import javax.naming.AuthenticationException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.common.api.AuthData;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 public class AuthenticationProviderTokenTest {
@@ -800,6 +802,45 @@ public class AuthenticationProviderTokenTest {
         provider.close();
     }
 
+    @Test
+    public void testTokenSettingPrefix() throws Exception {
+        AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        String publicKeyStr = AuthTokenUtils.encodeKeyBase64(keyPair.getPublic());
+        Properties properties = new Properties();
+        // Use public key for validation
+        properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_PUBLIC_KEY, publicKeyStr);
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+
+        ServiceConfiguration mockConf = Mockito.mock(ServiceConfiguration.class);
+        String prefix = "test";
+
+        Mockito.when(mockConf.getProperty(anyString()))
+                .thenAnswer(invocationOnMock ->
+                        conf.getProperty(((String) invocationOnMock.getArgument(0)).substring(prefix.length()))
+                );
+        Mockito.when(mockConf.getProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX)).thenReturn(prefix);
+
+        provider.initialize(mockConf);
+
+        // Each property is fetched only once. Prevent multiple fetches.
+        Mockito.verify(mockConf, Mockito.times(1)).getProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX);
+        Mockito.verify(mockConf, Mockito.times(1))
+                .getProperty(prefix + AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY);
+        Mockito.verify(mockConf, Mockito.times(1))
+                .getProperty(prefix + AuthenticationProviderToken.CONF_TOKEN_PUBLIC_KEY);
+        Mockito.verify(mockConf, Mockito.times(1))
+                .getProperty(prefix + AuthenticationProviderToken.CONF_TOKEN_AUTH_CLAIM);
+        Mockito.verify(mockConf, Mockito.times(1))
+                .getProperty(prefix + AuthenticationProviderToken.CONF_TOKEN_PUBLIC_ALG);
+        Mockito.verify(mockConf, Mockito.times(1))
+                .getProperty(prefix + AuthenticationProviderToken.CONF_TOKEN_AUDIENCE_CLAIM);
+        Mockito.verify(mockConf, Mockito.times(1))
+                .getProperty(prefix + AuthenticationProviderToken.CONF_TOKEN_AUDIENCE);
+    }
+
     private static String createTokenWithAudience(Key signingKey, String audienceClaim, List<String> audience) {
         JwtBuilder builder = Jwts.builder()
                 .setSubject(SUBJECT)