You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:18:26 UTC

[pulsar] branch branch-2.6 updated (848e53d -> 63d4078)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a change to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 848e53d  Fix producer stucks on creating ledger timeout (#7319)
     new 3dc58d0  limiting batch size to the minimum of the maxNumberOfMessages and maxSizeOfMessages (#6865)
     new 1593dfa  Fix hash range conflict issue in Key_Shared with sticky hash range (#7231)
     new e752e0f  Fix lookup permission error (#7234)
     new cbf5bb2  Update Jetty to 9.4.29 (#7235)
     new 450ec37  Fixed readers backlog stats after data is skipped (#7236)
     new dd2cadb  Fix the regression from #6428 (#7241)
     new 86c69e8  Avoid introduce null read position for the managed cursor. (#7264)
     new 3498f40  typo (#7281)
     new a994d5d  Fix partition index error in close callback (#7282)
     new 231aaa4  [PIP-55][Doc]--Update security overview (#7302)
     new 4c4107a  Fix issue where HTTP header used in Athenz authentication can not be renamed (#7311)
     new b6d71b7  Fix issue #7315 (#7316)
     new a5a863e  Fixing go instance config port apache/pulsar#7267 (#7322)
     new 14710a9   chunking for PIP-37 support large message size (#7334)
     new a050d78  Add more detail information of retry errors (#7341)
     new 4b103cd  Handling error in creation of non-durable cursor (#7355)
     new 8c61215  Fix bug related to managedLedger properties (#7357)
     new b48120f  Use fully qualified hostname as default to advertise worker. (#7360)
     new bbcb4ee  Use hostname for bookie rackawareness mapping (#7361)
     new 63d4078  Fix conflict

The 20 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/src/assemble/LICENSE.bin.txt   |  34 ++--
 .../standalone-dashboard/docker-compose.yml        |   2 +-
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |   6 +-
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  46 +++++-
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 184 +++++++++++++++------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 112 ++++++++++---
 .../mledger/impl/NonDurableCursorTest.java         |  45 +++++
 pom.xml                                            |   2 +-
 .../authorization/AuthorizationProvider.java       |  14 +-
 .../broker/authorization/AuthorizationService.java |  14 +-
 .../authorization/PulsarAuthorizationProvider.java |  10 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  20 ++-
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |   9 +-
 .../broker/service/persistent/PersistentTopic.java |  59 +++----
 .../pulsar/broker/admin/AdminApiSchemaTest.java    |   3 +
 .../apache/pulsar/broker/admin/NamespacesTest.java |  17 +-
 .../broker/service/BacklogQuotaManagerTest.java    |   2 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |   8 +-
 .../client/api/ConsumerBatchReceiveTest.java       |  31 ++++
 .../client/api/KeySharedSubscriptionTest.java      |  64 +++++++
 .../pulsar/client/admin/PulsarAdminException.java  |   2 +-
 .../admin/internal/http/AsyncHttpConnector.java    |   5 +-
 .../pulsar/client/api/PulsarClientException.java   |  19 +++
 .../client/impl/auth/AuthenticationAthenz.java     |  16 +-
 .../client/impl/auth/AuthenticationAthenzTest.java |  89 ++++++----
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  14 +-
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |  16 +-
 pulsar-client-cpp/lib/ProducerImpl.h               |   2 +
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   2 +
 .../apache/pulsar/client/impl/MessagesImpl.java    |  12 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |   1 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |   1 +
 .../pulsar/functions/worker/WorkerConfig.java      |   3 +-
 pulsar-sql/presto-distribution/LICENSE             |  12 +-
 .../zookeeper/ZkBookieRackAffinityMapping.java     |   5 -
 site2/docs/assets/chunking-01.png                  | Bin 0 -> 11881 bytes
 site2/docs/assets/chunking-02.png                  | Bin 0 -> 30135 bytes
 site2/docs/concepts-messaging.md                   |  26 +++
 site2/docs/security-overview.md                    |   4 +
 40 files changed, 681 insertions(+), 232 deletions(-)
 create mode 100644 site2/docs/assets/chunking-01.png
 create mode 100644 site2/docs/assets/chunking-02.png


[pulsar] 01/20: limiting batch size to the minimum of the maxNumberOfMessages and maxSizeOfMessages (#6865)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3dc58d0e3f48152d9002d05dd6ebfa1464e345c1
Author: avimas <43...@users.noreply.github.com>
AuthorDate: Wed Jun 10 10:31:15 2020 +0300

    limiting batch size to the minimum of the maxNumberOfMessages and maxSizeOfMessages (#6865)
    
    Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy.
    
     1. Batch size is not limited to the minimum of the maxNumberOfMessages and maxSizeOfMessages from the BatchRecieve policy.
    I think this issue can be easly fixed by changing the canAdd function in MessagesImpl from:
    ```
    protected boolean canAdd(Message<T> message) {
        if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages <= 0L) {
            return true;
        } else {
            return this.maxNumberOfMessages > 0 && this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages || this.maxSizeOfMessages > 0L &&   this.currentSizeOfMessages + (long)message.getData().length <= this.maxSizeOfMessages;
        }
    }
    ```
    to (changing the condintion in the else to && instead of ||):
    ```
    protected boolean canAdd(Message<T> message) {
        if (this.maxNumberOfMessages <= 0 && this.maxSizeOfMessages <= 0L) {
            return true;
        } else {
            return (this.maxNumberOfMessages > 0 && this.currentNumberOfMessages + 1 <= this.maxNumberOfMessages) && (this.maxSizeOfMessages > 0L &&   this.currentSizeOfMessages + (long)message.getData().length <= this.maxSizeOfMessages);
        }
    }
    ```
    
    2. When the batch size is higher than the recieveQ of the consumer (I used a batch size of 3000 and a receiveQ of 500) I noticed the following issues:
    	a. In a mutliTopic (pattern) consumer the client stops receiving any messages I think it getting paused and never resumed when setting a timeout in the batch policy, only one batch is fetched and the client never resumed.
    
    talked with @codelipenghui  and advised to open this pull request
    
    (cherry picked from commit 941ddd683d748588257e96c013a520c116c7c7df)
---
 .../client/api/ConsumerBatchReceiveTest.java       | 31 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/MessagesImpl.java    | 12 ++++++---
 .../client/impl/MultiTopicsConsumerImpl.java       |  1 +
 3 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
index 54a6e52..eb65198 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerBatchReceiveTest.java
@@ -214,6 +214,37 @@ public class ConsumerBatchReceiveTest extends ProducerConsumerBase {
         testBatchReceiveAndRedelivery(topic, batchReceivePolicy, batchProduce, receiverQueueSize);
     }
 
+    @Test
+    public void verifyBatchSizeIsEqualToPolicyConfiguration() throws Exception {
+        final int muxNumMessages = 100;
+        final int messagesToSend = 500;
+
+        final String topic = "persistent://my-property/my-ns/batch-receive-size" + UUID.randomUUID();
+        BatchReceivePolicy batchReceivePolicy = BatchReceivePolicy.builder().maxNumMessages(muxNumMessages).build();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("s2")
+                .batchReceivePolicy(batchReceivePolicy)
+                .subscribe();
+
+        sendMessagesAsyncAndWait(producer, messagesToSend);
+        receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(consumer, batchReceivePolicy, messagesToSend / muxNumMessages);
+    }
+
+
+    private void receiveAllBatchesAndVerifyBatchSizeIsEqualToMaxNumMessages(Consumer<String> consumer, BatchReceivePolicy batchReceivePolicy, int numOfExpectedBatches) throws PulsarClientException {
+        Messages<String> messages;
+        for (int i = 0; i < numOfExpectedBatches; i++) {
+            messages = consumer.batchReceive();
+            log.info("Received {} messages in a single batch receive verifying batch size.", messages.size());
+            Assert.assertEquals(messages.size(), batchReceivePolicy.getMaxNumMessages());
+        }
+    }
+
     private void testBatchReceive(String topic, BatchReceivePolicy batchReceivePolicy, boolean batchProduce, int receiverQueueSize) throws Exception {
         ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING).topic(topic);
         if (!batchProduce) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
index 855452f..c56694e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessagesImpl.java
@@ -45,11 +45,15 @@ public class MessagesImpl<T> implements Messages<T> {
     }
 
     protected boolean canAdd(Message<T> message) {
-        if (maxNumberOfMessages <= 0 && maxSizeOfMessages <= 0) {
-            return true;
+        if (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 > maxNumberOfMessages) {
+            return false;
         }
-        return (maxNumberOfMessages > 0 && currentNumberOfMessages + 1 <= maxNumberOfMessages)
-                || (maxSizeOfMessages > 0 && currentSizeOfMessages + message.getData().length <= maxSizeOfMessages);
+
+        if (maxSizeOfMessages > 0 && currentSizeOfMessages + message.getData().length > maxSizeOfMessages) {
+            return false;
+        }
+
+        return true;
     }
 
     protected void add(Message<T> message) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ca67edf..c79b437 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -394,6 +394,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 pendingBatchReceives.add(OpBatchReceive.of(result));
             }
+            resumeReceivingFromPausedConsumersIfNeeded();
         } finally {
             lock.writeLock().unlock();
         }


[pulsar] 05/20: Fixed readers backlog stats after data is skipped (#7236)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 450ec37256f0778bbe4a1d67e927266847aba101
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 15 18:25:46 2020 -0700

    Fixed readers backlog stats after data is skipped (#7236)
    
    ### Motivation
    
    The metrics for the reader backlog keep increasing when data is dropped because the reader cursor only moves on the next read attempt.
    Instead we should proactively move the cursor forward on the first valid ledger.
    
    (cherry picked from commit 6b9c90ff89f541e84662292d450f628597c4b95a)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 36 ++++++++++++++++-
 .../mledger/impl/NonDurableCursorTest.java         | 45 ++++++++++++++++++++++
 .../broker/service/BacklogQuotaManagerTest.java    |  2 +-
 3 files changed, 81 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5fd9b24..4b3937f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -44,11 +44,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.Random;
 import java.util.UUID;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -85,6 +85,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
@@ -2067,6 +2068,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 return;
             }
 
+            advanceNonDurableCursors(ledgersToDelete);
+
             // Update metadata
             for (LedgerInfo ls : ledgersToDelete) {
                 ledgerCache.remove(ls.getLedgerId());
@@ -2126,6 +2129,37 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     /**
+     * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data.
+     * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
+     * entries and the stats are reported correctly.
+     */
+    private void advanceNonDurableCursors(List<LedgerInfo> ledgersToDelete) {
+        if (ledgersToDelete.isEmpty()) {
+            return;
+        }
+
+        long firstNonDeletedLedger = ledgers
+                .higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+        PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
+
+        cursors.forEach(cursor -> {
+            if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0) {
+                cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
+                    @Override
+                    public void markDeleteComplete(Object ctx) {
+                    }
+
+                    @Override
+                    public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                        log.warn("[{}] Failed to mark delete while trimming data ledgers: {}", name,
+                                exception.getMessage());
+                    }
+                }, null);
+            }
+        });
+    }
+
+    /**
      * Delete this ManagedLedger completely from the system.
      *
      * @throws Exception
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 2772541..7a15035 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -28,8 +28,11 @@ import static org.testng.Assert.fail;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 import java.nio.charset.Charset;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -678,6 +681,48 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testBacklogStatsWhenDroppingData() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testBacklogStatsWhenDroppingData",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 10);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 10);
+
+        c1.markDelete(positions.get(4));
+        assertEquals(c1.getNumberOfEntries(), 5);
+        assertEquals(c1.getNumberOfEntriesInBacklog(true), 5);
+
+        // Since the durable cursor has moved, the data will be trimmed
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 6);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 6);
+
+        c1.close();
+        ledger.deleteCursor(c1.getName());
+        promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 1);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 1);
+
+        ledger.close();
+    }
+
     @Test(expectedExceptions = NullPointerException.class)
     void testCursorWithNameIsNotNull() throws Exception {
         final String p1CursorName = "entry-1";
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 8636cf1..0c317a0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -161,7 +161,7 @@ public class BacklogQuotaManagerTest {
             // non-durable mes should still
             assertEquals(stats.subscriptions.size(), 1);
             long nonDurableSubscriptionBacklog = stats.subscriptions.values().iterator().next().msgBacklog;
-            assertEquals(nonDurableSubscriptionBacklog, numMsgs,
+            assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
               "non-durable subscription backlog is [" + nonDurableSubscriptionBacklog + "]"); ;
 
             try {


[pulsar] 13/20: Fixing go instance config port apache/pulsar#7267 (#7322)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a5a863e4a8738dd99ed3f9e9eb11aacee942db8a
Author: Bogdan <bn...@gmail.com>
AuthorDate: Sun Jul 5 08:24:18 2020 +0300

    Fixing go instance config port apache/pulsar#7267 (#7322)
    
    ## Motivation
    As described in the original issue, whenever you're trying to localrun or create a golang function - the process port is set to 0. This leads to health checks errors.
    
    ## Modifications
    The original method that returns golang cmd arguments didn't use the port variable
    from the instanceConfig parameter.
    This is a one-line modification that fixes the problem.
    
    (cherry picked from commit 8c5566434031298f316410d7f2ec0ac7373fc42b)
---
 .../src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java  | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index ca70b42..fca33b7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -214,6 +214,7 @@ public class RuntimeUtils {
         }
 
         goInstanceConfig.setKillAfterIdleMs(0);
+        goInstanceConfig.setPort(instanceConfig.getPort());
 
         // Parse the contents of goInstanceConfig into json form string
         ObjectMapper objectMapper = ObjectMapperFactory.getThreadLocal();


[pulsar] 06/20: Fix the regression from #6428 (#7241)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dd2cadb9a2a11b5c5c67b028ce65ebef4a732ad9
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Tue Jun 23 20:15:33 2020 -0500

    Fix the regression from #6428 (#7241)
    
    ### Motivation
    In #6708, we change to use `isSuperUser(String, AuthenticationDataSource, ServiceConfiguration)` for the dynamic check of superuser using AuthenticationDataSource. And #6428 is using old method  `isSuperUser(String, ServiceConfiguration)`,
    This change tries to change it back.
    
    ### Modifications
    
    switch `isSuperUser(String, ServiceConfiguration)` to `isSuperUser(String, AuthenticationDataSource, ServiceConfiguration)`
    
    (cherry picked from commit 45afb5690ed761a5c18ca3dc95669a7e0c39aed2)
---
 .../pulsar/broker/authorization/AuthorizationProvider.java | 14 ++++++++------
 .../broker/authorization/PulsarAuthorizationProvider.java  | 10 +++++-----
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index c4fcc5e..4eb5d93 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -47,25 +47,27 @@ public interface AuthorizationProvider extends Closeable {
     /**
      * Check if specified role is a super user
      * @param role the role to check
+     * @param authenticationData authentication data related to the role
      * @return a CompletableFuture containing a boolean in which true means the role is a super user
      * and false if it is not
      */
-    default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
+    default CompletableFuture<Boolean> isSuperUser(String role,
+                                                   AuthenticationDataSource authenticationData,
+                                                   ServiceConfiguration serviceConfiguration) {
         Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
         return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
     }
 
     /**
+     * @deprecated Use method {@link #isSuperUser(String, AuthenticationDataSource, ServiceConfiguration)}
      * Check if specified role is a super user
      * @param role the role to check
-     * @param authenticationData authentication data related to the role
      * @return a CompletableFuture containing a boolean in which true means the role is a super user
      * and false if it is not
      */
-    default CompletableFuture<Boolean> isSuperUser(String role,
-                                                   AuthenticationDataSource authenticationData,
-                                                   ServiceConfiguration serviceConfiguration) {
-        return isSuperUser(role, serviceConfiguration);
+    default CompletableFuture<Boolean> isSuperUser(String role, ServiceConfiguration serviceConfiguration) {
+        Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
+        return CompletableFuture.completedFuture(role != null && superUserRoles.contains(role) ? true : false);
     }
 
     /**
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 40b2021..1aa79bf 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -322,7 +322,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
             String role, String authDataJson) {
         return updateSubscriptionPermissionAsync(namespace, subscriptionName, Collections.singleton(role), true);
     }
-    
+
     private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName namespace, String subscriptionName, Set<String> roles,
             boolean remove) {
         CompletableFuture<Void> result = new CompletableFuture<>();
@@ -549,7 +549,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
                     new IllegalStateException("TopicOperation is not supported."));
         }
 
-        CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, conf);
+        CompletableFuture<Boolean> isSuperUserFuture = isSuperUser(role, authData, conf);
 
         return isSuperUserFuture
                 .thenCombine(isAuthorizedFuture, (isSuperUser, isAuthorized) -> isSuperUser || isAuthorized);
@@ -573,14 +573,14 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
 
             if (role != null && conf.getProxyRoles().contains(role)) {
                 // role check
-                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, conf);
+                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
                 CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
                 CompletableFuture<Boolean> isRoleAuthorizedFuture = isRoleSuperUserFuture
                         .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
                                 isRoleSuperUser || isRoleTenantAdmin);
 
                 // originalRole check
-                CompletableFuture<Boolean> isOriginalRoleSuperUserFuture = isSuperUser(originalRole, conf);
+                CompletableFuture<Boolean> isOriginalRoleSuperUserFuture = isSuperUser(originalRole, authData, conf);
                 CompletableFuture<Boolean> isOriginalRoleTenantAdminFuture = isTenantAdmin(tenantName, originalRole,
                         tenantInfo, authData);
                 CompletableFuture<Boolean> isOriginalRoleAuthorizedFuture = isOriginalRoleSuperUserFuture
@@ -593,7 +593,7 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
                                 isRoleAuthorized && isOriginalRoleAuthorized);
             } else {
                 // role check
-                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, conf);
+                CompletableFuture<Boolean> isRoleSuperUserFuture = isSuperUser(role, authData, conf);
                 CompletableFuture<Boolean> isRoleTenantAdminFuture = isTenantAdmin(tenantName, role, tenantInfo, authData);
                 return isRoleSuperUserFuture
                         .thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->


[pulsar] 11/20: Fix issue where HTTP header used in Athenz authentication can not be renamed (#7311)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4c4107a42586c983a298f2fa6fe4138abf5a106c
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Sat Jun 20 13:46:25 2020 +0900

    Fix issue where HTTP header used in Athenz authentication can not be renamed (#7311)
    
    
    (cherry picked from commit 82e30678425f423dd28fead0ba8840d1c521ad61)
---
 .../client/impl/auth/AuthenticationAthenz.java     | 16 ++--
 .../client/impl/auth/AuthenticationAthenzTest.java | 89 ++++++++++++++--------
 2 files changed, 68 insertions(+), 37 deletions(-)

diff --git a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
index 3e0aef3..cf93064 100644
--- a/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
+++ b/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenz.java
@@ -59,6 +59,7 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
     private String providerDomain;
     private PrivateKey privateKey;
     private String keyId = "0";
+    private String roleHeader = null;
     // If auto prefetching is enabled, application will not complete until the static method
     // ZTSClient.cancelPrefetch() is called.
     // cf. https://github.com/yahoo/athenz/issues/544
@@ -80,7 +81,7 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
     @Override
     synchronized public AuthenticationDataProvider getAuthData() throws PulsarClientException {
         if (cachedRoleTokenIsValid()) {
-            return new AuthenticationDataAthenz(roleToken, ZTSClient.getHeader());
+            return new AuthenticationDataAthenz(roleToken, isNotBlank(roleHeader) ? roleHeader : ZTSClient.getHeader());
         }
         try {
             // the following would set up the API call that requests tokens from the server
@@ -89,7 +90,7 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
             RoleToken token = getZtsClient().getRoleToken(providerDomain, null, minValidity, maxValidity, false);
             roleToken = token.getToken();
             cachedRoleTokenTimestamp = System.nanoTime();
-            return new AuthenticationDataAthenz(roleToken, ZTSClient.getHeader());
+            return new AuthenticationDataAthenz(roleToken, isNotBlank(roleHeader) ? roleHeader : ZTSClient.getHeader());
         } catch (Throwable t) {
             throw new GettingAuthenticationDataException(t);
         }
@@ -142,16 +143,17 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
         this.keyId = authParams.getOrDefault("keyId", "0");
         this.autoPrefetchEnabled = Boolean.valueOf(authParams.getOrDefault("autoPrefetchEnabled", "false"));
 
-        if (authParams.containsKey("athenzConfPath")) {
+        if (isNotBlank(authParams.get("athenzConfPath"))) {
             System.setProperty("athenz.athenz_conf", authParams.get("athenzConfPath"));
         }
-        if (authParams.containsKey("principalHeader")) {
+        if (isNotBlank(authParams.get("principalHeader"))) {
             System.setProperty("athenz.auth.principal.header", authParams.get("principalHeader"));
         }
-        if (authParams.containsKey("roleHeader")) {
-            System.setProperty("athenz.auth.role.header", authParams.get("roleHeader"));
+        if (isNotBlank(authParams.get("roleHeader"))) {
+            this.roleHeader = authParams.get("roleHeader");
+            System.setProperty("athenz.auth.role.header", this.roleHeader);
         }
-        if (authParams.containsKey("ztsUrl")) {
+        if (isNotBlank(authParams.get("ztsUrl"))) {
             this.ztsUrl = authParams.get("ztsUrl");
         }
     }
diff --git a/pulsar-client-auth-athenz/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenzTest.java b/pulsar-client-auth-athenz/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenzTest.java
index f6f6968..6323f17 100644
--- a/pulsar-client-auth-athenz/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenzTest.java
+++ b/pulsar-client-auth-athenz/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationAthenzTest.java
@@ -51,6 +51,38 @@ public class AuthenticationAthenzTest {
     private static final String TENANT_DOMAIN = "test_tenant";
     private static final String TENANT_SERVICE = "test_service";
 
+    class MockZTSClient extends ZTSClient {
+        public MockZTSClient(String ztsUrl) {
+            super(ztsUrl);
+        }
+
+        @Override
+        public RoleToken getRoleToken(String domainName, String roleName, Integer minExpiryTime, Integer maxExpiryTime,
+                boolean ignoreCache) {
+            List<String> roles = new ArrayList<String>() {
+                {
+                    add("test_role");
+                }
+            };
+            com.yahoo.athenz.auth.token.RoleToken roleToken = new com.yahoo.athenz.auth.token.RoleToken.Builder(
+                    "Z1", domainName, roles).principal(String.format("%s.%s", TENANT_DOMAIN, TENANT_SERVICE))
+                            .build();
+
+            try {
+                String ztsPrivateKey = new String(
+                        Files.readAllBytes(Paths.get("./src/test/resources/zts_private.pem")));
+                roleToken.sign(ztsPrivateKey);
+            } catch (IOException e) {
+                return null;
+            }
+
+            RoleToken token = new RoleToken();
+            token.setToken(roleToken.getSignedToken());
+
+            return token;
+        }
+    }
+
     @BeforeClass
     public void setup() throws Exception {
         String paramsStr = new String(Files.readAllBytes(Paths.get("./src/test/resources/authParams.json")));
@@ -59,36 +91,7 @@ public class AuthenticationAthenzTest {
         // Set mock ztsClient which returns fixed token instead of fetching from ZTS server
         Field field = auth.getClass().getDeclaredField("ztsClient");
         field.setAccessible(true);
-        ZTSClient mockZtsClient = new ZTSClient("dummy") {
-            @Override
-            public RoleToken getRoleToken(String domainName, String roleName, Integer minExpiryTime,
-                    Integer maxExpiryTime, boolean ignoreCache) {
-
-                List<String> roles = new ArrayList<String>() {
-                    {
-                        add("test_role");
-                    }
-                };
-                com.yahoo.athenz.auth.token.RoleToken roleToken = new com.yahoo.athenz.auth.token.RoleToken.Builder(
-                        "Z1", domainName, roles).principal(String.format("%s.%s", TENANT_DOMAIN, TENANT_SERVICE))
-                                .build();
-
-                try {
-                    String ztsPrivateKey = new String(
-                            Files.readAllBytes(Paths.get("./src/test/resources/zts_private.pem")));
-                    roleToken.sign(ztsPrivateKey);
-                } catch (IOException e) {
-                    return null;
-                }
-
-                RoleToken token = new RoleToken();
-                token.setToken(roleToken.getSignedToken());
-
-                return token;
-            }
-
-        };
-        field.set(auth, mockZtsClient);
+        field.set(auth, new MockZTSClient("dummy"));
     }
 
     @Test
@@ -194,4 +197,30 @@ public class AuthenticationAthenzTest {
         assertFalse((boolean) field.get(auth2));
         auth2.close();
     }
+
+    @Test
+    public void testRoleHeaderSetting() throws Exception {
+        assertEquals(auth.getAuthData().getHttpHeaders().iterator().next().getKey(), ZTSClient.getHeader());
+
+        Field field = auth.getClass().getDeclaredField("ztsClient");
+        field.setAccessible(true);
+
+        String paramsStr = new String(Files.readAllBytes(Paths.get("./src/test/resources/authParams.json")));
+        ObjectMapper jsonMapper = ObjectMapperFactory.create();
+        Map<String, String> authParamsMap = jsonMapper.readValue(paramsStr, new TypeReference<HashMap<String, String>>() { });
+
+        authParamsMap.put("roleHeader", "");
+        AuthenticationAthenz auth1 = new AuthenticationAthenz();
+        auth1.configure(jsonMapper.writeValueAsString(authParamsMap));
+        field.set(auth1, new MockZTSClient("dummy"));
+        assertEquals(auth1.getAuthData().getHttpHeaders().iterator().next().getKey(), ZTSClient.getHeader());
+        auth1.close();
+
+        authParamsMap.put("roleHeader", "Test-Role-Header");
+        AuthenticationAthenz auth2 = new AuthenticationAthenz();
+        auth2.configure(jsonMapper.writeValueAsString(authParamsMap));
+        field.set(auth2, new MockZTSClient("dummy"));
+        assertEquals(auth2.getAuthData().getHttpHeaders().iterator().next().getKey(), "Test-Role-Header");
+        auth2.close();
+    }
 }


[pulsar] 12/20: Fix issue #7315 (#7316)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b6d71b7b71854ae2b63eec6922194c4099df2963
Author: Fernando Miguélez Palomo <fe...@gmail.com>
AuthorDate: Tue Jul 28 16:07:39 2020 +0200

    Fix issue #7315 (#7316)
    
    Fixes issue #7315
    
    Co-authored-by: xiaolong.ran <rx...@apache.org>
    (cherry picked from commit a74fe60b85537e8f94ef2276c5758b9893a4d6ee)
---
 docker-compose/standalone-dashboard/docker-compose.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docker-compose/standalone-dashboard/docker-compose.yml b/docker-compose/standalone-dashboard/docker-compose.yml
index 2a3a422..10ece5e 100644
--- a/docker-compose/standalone-dashboard/docker-compose.yml
+++ b/docker-compose/standalone-dashboard/docker-compose.yml
@@ -27,7 +27,7 @@ services:
       - 8080
       - 6650 
     environment:
-      - PULSAR_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
+      - BOOKIE_MEM=" -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"
     command: >
       /bin/bash -c
       "bin/apply-config-from-env.py conf/standalone.conf


[pulsar] 07/20: Avoid introduce null read position for the managed cursor. (#7264)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 86c69e86db65c0e1d54a89dba41a6a2879b29767
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Jun 16 09:20:18 2020 +0800

    Avoid introduce null read position for the managed cursor. (#7264)
    
    ### Motivation
    
    Avoid introduce null read position for the managed cursor.
    
    Here is the error log related to null read position:
    ```
    18:52:13.366 [pulsar-stats-updater-23-1] ERROR org.apache.pulsar.broker.service.persistent.PersistentTopic - Got exception when creating consumer stats for subscription itom-di-dp-preload_chotest_2-reader-4bd4e3dd50: null
    java.lang.NullPointerException: null
    	at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:877) ~[com.google.guava-guava-25.1-jre.jar:?]
    	at org.apache.bookkeeper.mledger.impl.PositionImpl.compareTo(PositionImpl.java:92) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage(ManagedCursorImpl.java:721) ~[org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage(PersistentSubscription.java:790) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$updateRates$46(PersistentTopic.java:1419) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.updateRates(PersistentTopic.java:1387) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$1(PulsarStats.java:134) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$null$3(PulsarStats.java:131) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.lambda$updateStats$4(PulsarStats.java:120) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.PulsarStats.updateStats(PulsarStats.java:110) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.pulsar.broker.service.BrokerService.updateRates(BrokerService.java:1145) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
    	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_242]
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_242]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_242]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_242]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_242]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_242]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242]
    ```
    The most doubtful thing is `getNextValidPosition` method in the ManagedLedgerImpl. If given a position which greater than the last add position, it will return a null value. This may cause the read position to become null. But I haven’t found how this situation appears. So in the PR, I added a log and print the stack trace which can help us to find the root cause and failback to the next position of the last position if the null next valid position occurs.
    
    (cherry picked from commit 7955cef6c5dff2f22cfc91e53d1d29562f232846)
---
 .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java   |  2 +-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java   | 13 ++++++++++++-
 .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java   |  2 ++
 .../pulsar/broker/service/persistent/PersistentTopic.java   |  2 +-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 304429d..66e977f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -780,7 +780,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         // validate it before preparing range
         PositionImpl markDeletePosition = this.markDeletePosition;
         PositionImpl readPosition = this.readPosition;
-        return (markDeletePosition.compareTo(readPosition) < 0)
+        return (markDeletePosition != null && readPosition != null && markDeletePosition.compareTo(readPosition) < 0)
                 ? ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, readPosition))
                 : 0;
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4b3937f..27d8848 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2864,11 +2864,22 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public PositionImpl getNextValidPosition(final PositionImpl position) {
+        PositionImpl next;
+        try {
+            next = getNextValidPositionInternal(position);
+        } catch (NullPointerException e) {
+            next = lastConfirmedEntry.getNext();
+            log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e);
+        }
+        return next;
+    }
+
+    public PositionImpl getNextValidPositionInternal(final PositionImpl position) {
         PositionImpl nextPosition = position.getNext();
         while (!isValidPosition(nextPosition)) {
             Long nextLedgerId = ledgers.ceilingKey(nextPosition.getLedgerId() + 1);
             if (nextLedgerId == null) {
-                return null;
+                throw new NullPointerException();
             }
             nextPosition = PositionImpl.get(nextLedgerId, 0);
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index b5b702f..e507c99 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1966,6 +1966,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         assertEquals(ledger.getNextValidPosition((PositionImpl) c1.getMarkDeletedPosition()), p1);
         assertEquals(ledger.getNextValidPosition(p1), p2);
         assertEquals(ledger.getNextValidPosition(p3), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
+        assertEquals(ledger.getNextValidPosition(PositionImpl.get(p3.getLedgerId() + 1, p3.getEntryId() + 1)), PositionImpl.get(p3.getLedgerId(), p3.getEntryId() + 1));
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index a7fa72d..d7fcd41 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -624,7 +624,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 future.completeExceptionally(e);
             }
         }).exceptionally(ex -> {
-            log.warn("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex.getMessage());
+            log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex);
             USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
             future.completeExceptionally(new PersistenceException(ex));
             return null;


[pulsar] 19/20: Use hostname for bookie rackawareness mapping (#7361)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bbcb4eed12e5084aed6555a2ed97dfdb0e5c4913
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Jun 27 16:39:46 2020 -0700

    Use hostname for bookie rackawareness mapping (#7361)
    
    ### Motivation
    
    In #5607 the `useHostName()` was added with `return false`. That means that the rackaware policy will try to resolve the Bookies hostname into an IP and then use that IP to figure out which rack the bookie belongs.
    
    There are 2 problems:
     1. The IP won't match the hostname which is recorded in the `/bookies` z-node
     2. If there is an error in resolving the bookie hostname (eg: transient DNS error), an NPE exception will be triggered and the BK client will never realize that this bookie was ever seen as available in the cluster.
    
    ```
    java.lang.NullPointerException: null
    	at org.apache.bookkeeper.net.NetUtils.resolveNetworkLocation(NetUtils.java:77) ~[org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.resolveNetworkLocation(TopologyAwareEnsemblePlacementPolicy.java:779) ~[org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.createBookieNode(TopologyAwareEnsemblePlacementPolicy.java:775) ~[org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.handleBookiesThatJoined(TopologyAwareEnsemblePlacementPolicy.java:707) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.handleBookiesThatJoined(RackawareEnsemblePlacementPolicyImpl.java:79) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.handleBookiesThatJoined(RackawareEnsemblePlacementPolicy.java:246) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.onClusterChanged(TopologyAwareEnsemblePlacementPolicy.java:654) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.onClusterChanged(RackawareEnsemblePlacementPolicyImpl.java:79) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy.onClusterChanged(RackawareEnsemblePlacementPolicy.java:89) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.BookieWatcherImpl.processReadOnlyBookiesChanged(BookieWatcherImpl.java:190) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.client.BookieWatcherImpl.lambda$initialBlockingBookieRead$2(BookieWatcherImpl.java:209) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.discover.ZKRegistrationClient$WatchTask.accept(ZKRegistrationClient.java:139) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    	at org.apache.bookkeeper.discover.ZKRegistrationClient$WatchTask.accept(ZKRegistrationClient.java:62) [org.apache.bookkeeper-bookkeeper-server-4.10.0.jar:4.10.0]
    ```
    
    The exception is thrown at 77, since `getAddress()` yealds a `null` given that the address is unresolved.
    
    ```java
    74        if (dnsResolver.useHostName()) {
    75            names.add(addr.getHostName());
    76        } else {
    77            names.add(addr.getAddress().getHostAddress());
    78        }
    ```
    
    The default implementation for the `DnsResolver.useHostName()` is to return true.
    
    (cherry picked from commit e073465d70e0910ea533aefa5cb4af07423f22b9)
---
 .../org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java     | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
index 7b9f314..93400e9 100644
--- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
+++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMapping.java
@@ -215,11 +215,6 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
     }
 
     @Override
-    public boolean useHostName() {
-        return false;
-    }
-
-    @Override
     public void onUpdate(String path, BookiesRackConfiguration data, Stat stat) {
         if (rackawarePolicy != null) {
             LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", data.toString());


[pulsar] 14/20: chunking for PIP-37 support large message size (#7334)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 14710a992750a945532b0d3bf40a0aa0de769a79
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Wed Jun 24 09:14:15 2020 +0800

     chunking for PIP-37 support large message size (#7334)
    
    ### Motivation
    
    Create docs for PIP-37 to support large size of messages in pulsar.
    
    ### Modifications
    
    Add the chunking section in Pulsar Messaging doc.
    Add two images about how to handle chunked messages with one or more producers
    
    (cherry picked from commit 3708c9395039e6af0a36b7c87f51a9115f0028f9)
---
 site2/docs/assets/chunking-01.png | Bin 0 -> 11881 bytes
 site2/docs/assets/chunking-02.png | Bin 0 -> 30135 bytes
 site2/docs/concepts-messaging.md  |  26 ++++++++++++++++++++++++++
 3 files changed, 26 insertions(+)

diff --git a/site2/docs/assets/chunking-01.png b/site2/docs/assets/chunking-01.png
new file mode 100644
index 0000000..f83b747
Binary files /dev/null and b/site2/docs/assets/chunking-01.png differ
diff --git a/site2/docs/assets/chunking-02.png b/site2/docs/assets/chunking-02.png
new file mode 100644
index 0000000..eb47a86
Binary files /dev/null and b/site2/docs/assets/chunking-02.png differ
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 5805971..095fe86 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -59,6 +59,32 @@ To avoid redelivering acknowledged messages in a batch to the consumer, Pulsar i
 
 By default, batch index acknowledgement is disabled (`batchIndexAcknowledgeEnable=false`). You can enable batch index acknowledgement by setting the `batchIndexAcknowledgeEnable` parameter to `true` at the broker side. Enabling batch index acknowledgement may bring more memory overheads. So, perform this operation with caution.
 
+### Chunking
+
+#### Note
+
+> - Batching and chunking cannot be enabled simultaneously. To enable chunking, you must disable batching in advance.
+> - Chunking is only supported for persisted topics.
+> - Chunking is only supported for the exclusive and failover subscription modes.
+
+When chunking is enabled (`chunkingEnabled=true`), if the message size is greater than the allowed maximum publish-payload size, the producer splits the original message into chunked messages and publishes them with chunked metadata to the broker separately and in order. At the broker, the chunked messages are stored in the managed-ledger in the same way as that of ordinary messages. The only difference is that the consumer needs to buffer the chunked messages and combines them into the  [...]
+
+The consumer consumes the chunked messages and buffers them until the consumer receives all the chunks of a message. Finally, the consumer stitches chunked messages together and places them into the receiver-queue . Therefore, the client can consume messages from there. Once the consumer consumes entire large message and acknowledges it, the consumer internally sends acknowledgement of all the chunk messages associated to that large message. You can set the `maxPendingChuckedMessage` par [...]
+
+ The broker does not require any changes to support chunking for non-shared subscription. The broker only use the `chuckedMessageRate` to record chunked message rate on the topic.
+
+#### Handle chunked messages with one producer and one ordered consumer
+
+As shown in the following figure, when a topic has one producer which publishes large message payload in chunked messages along with regular non-chunked messages. the producer publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. The broker stores all 3 chunked messages in the managed-ledger and dispatches to the ordered (exclusive/failover) consumer in the same order. The consumer buffers all the chunked messages in memory until it receives all the chunked messages, combines them [...]
+
+![](assets/chunking-01.png)
+
+#### Handle chunked messages with multiple producers and one ordered consumer
+
+When multiple publishers publishes chunked messages into the single topic. The broker stores all the chunked messages coming from different publishers in the same managed-ledger. As shown below, Producer 1 publishes message M1 in three chunks M1-C1, M1-C2 and M1-C3. Producer 2 publishes message M2 in three chunks M2-C1, M2-C2 and M2-C3. All chunked messages of the specific message are still in the order but might not be consecutive in the managed-ledger. This brings some memory pressure  [...]
+
+![](assets/chunking-02.png)
+
 ## Consumers
 
 A consumer is a process that attaches to a topic via a subscription and then receives messages.


[pulsar] 09/20: Fix partition index error in close callback (#7282)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a994d5daf16c5504db6658c37ea5d1b3043a88c6
Author: Yunze Xu <xy...@gmail.com>
AuthorDate: Thu Jun 18 02:07:59 2020 +0800

    Fix partition index error in close callback (#7282)
    
    
    (cherry picked from commit 72285f27755f961b61eb4d1eca891a6979044f50)
---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 14 +++++++-------
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 16 ++++++++--------
 pulsar-client-cpp/lib/ProducerImpl.h             |  2 ++
 3 files changed, 17 insertions(+), 15 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 88a2f0a..7f6d127 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -291,7 +291,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
         partitionedConsumerCreatedPromise_.setFailed(result);
         // unsubscribed all of the successfully subscribed partitioned consumers
         closeAsync(nullCallbackForCleanup);
-        LOG_DEBUG("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
+        LOG_ERROR("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
         return;
     }
 
@@ -350,17 +350,17 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
         return;
     }
     setState(Closed);
-    int consumerIndex = 0;
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
     // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
     // when `state_` is Ready
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
-        ConsumerImplPtr consumer = *i;
+    for (auto& consumer : consumers_) {
         if (!consumer->isClosed()) {
-            consumer->closeAsync(std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerClose,
-                                           shared_from_this(), std::placeholders::_1, consumerIndex,
-                                           callback));
+            auto self = shared_from_this();
+            const auto partition = consumer->getPartitionIndex();
+            consumer->closeAsync([this, self, partition, callback](Result result) {
+                handleSinglePartitionConsumerClose(result, partition, callback);
+            });
         } else {
             if (++consumerAlreadyClosed == consumers_.size()) {
                 // everything is closed already. so we are good.
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 628afbc..aa5e176 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -131,7 +131,7 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
         lock.unlock();
         closeAsync(closeCallback);
         partitionedProducerCreatedPromise_.setFailed(result);
-        LOG_DEBUG("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
+        LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result);
         return;
     }
 
@@ -204,17 +204,17 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
 void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
     setState(Closing);
 
-    int producerIndex = 0;
     unsigned int producerAlreadyClosed = 0;
 
     // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
     // when `state_` is Ready
-    for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) {
-        ProducerImplPtr prod = *i;
-        if (!prod->isClosed()) {
-            prod->closeAsync(std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerClose,
-                                       shared_from_this(), std::placeholders::_1, producerIndex,
-                                       closeCallback));
+    for (auto& producer : producers_) {
+        if (!producer->isClosed()) {
+            auto self = shared_from_this();
+            const auto partition = static_cast<unsigned int>(producer->partition());
+            producer->closeAsync([this, self, partition, closeCallback](Result result) {
+                handleSinglePartitionProducerClose(result, partition, closeCallback);
+            });
         } else {
             producerAlreadyClosed++;
         }
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 0a57da8d..25f628c 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -90,6 +90,8 @@ class ProducerImpl : public HandlerBase,
 
     uint64_t getProducerId() const;
 
+    int32_t partition() const noexcept { return partition_; }
+
     virtual void start();
 
     virtual void shutdown();


[pulsar] 20/20: Fix conflict

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 63d407854f4a0d940b07f25d02ac3b1c5723ef72
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Thu Jul 2 12:50:00 2020 +0800

    Fix conflict
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java       | 18 ++++++++++--------
 .../org/apache/pulsar/broker/admin/NamespacesTest.java | 15 ++++++++++-----
 2 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 07ed1e8..812ebd0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -28,7 +28,6 @@ import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_PO
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets.SetView;
-
 import java.net.URI;
 import java.net.URL;
 import java.util.Collections;
@@ -43,15 +42,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
-
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
-
-import org.apache.bookkeeper.mledger.LedgerOffloader;
-import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -65,6 +60,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -72,16 +68,16 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
-import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -101,7 +97,6 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.data.Stat;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,6 +105,13 @@ public abstract class NamespacesBase extends AdminResource {
     private static final long MAX_BUNDLES = ((long) 1) << 32;
 
     protected List<String> internalGetTenantNamespaces(String tenant) {
+        checkNotNull(tenant, "Tenant should not be null");
+        try {
+            NamedEntity.checkName(tenant);
+        } catch (IllegalArgumentException e) {
+            log.warn("[{}] Tenant name is invalid {}", clientAppId(), tenant, e);
+            throw new RestException(Status.PRECONDITION_FAILED, "Tenant name is not valid");
+        }
         validateTenantOperation(tenant, TenantOperation.LIST_NAMESPACES);
 
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 366f0e8..c0748ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -37,7 +37,6 @@ import static org.testng.Assert.fail;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
@@ -49,7 +48,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
@@ -57,7 +55,6 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
-
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -97,12 +94,12 @@ import org.apache.zookeeper.ZooDefs;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class NamespacesTest extends MockedPulsarServiceBaseTest {
     private static final Logger log = LoggerFactory.getLogger(NamespacesTest.class);
@@ -253,6 +250,14 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         assertEquals(namespaces.getTenantNamespaces(this.testTenant), expectedList);
 
         try {
+            // check the tenant name is valid
+            namespaces.getTenantNamespaces(this.testTenant + "/default");
+            fail("should have failed");
+        } catch (RestException e) {
+            assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
+        }
+
+        try {
             namespaces.getTenantNamespaces("non-existing-tenant");
             fail("should have failed");
         } catch (RestException e) {


[pulsar] 10/20: [PIP-55][Doc]--Update security overview (#7302)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 231aaa4547fed228ba7cd1acba6efbec97f31e39
Author: HuanliMeng <48...@users.noreply.github.com>
AuthorDate: Sun Jun 28 07:53:49 2020 +0800

    [PIP-55][Doc]--Update security overview (#7302)
    
    
    This PR is to update docs for PIP-55: https://github.com/apache/pulsar/pull/6074
    
    ### Motivation
    
    provide general doc description about implementing the authentication refreshing functionality.
    
    ### Modifications
    
    Update the Security overview for PIP 55.
    
    the `authenticationRefreshCheckSeconds` config has been added through the PR: https://github.com/apache/pulsar/pull/6074
    
    (cherry picked from commit 29b81ed090390506dd0d34beed66c719888a558c)
---
 site2/docs/security-overview.md | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/site2/docs/security-overview.md b/site2/docs/security-overview.md
index 7fc177e..a6de902 100644
--- a/site2/docs/security-overview.md
+++ b/site2/docs/security-overview.md
@@ -10,6 +10,10 @@ By default, Pulsar configures no encryption, authentication, or authorization. A
 
 Pulsar supports a pluggable authentication mechanism. And Pulsar clients use this mechanism to authenticate with brokers and proxies. You can also configure Pulsar to support multiple authentication sources.
 
+The Pulsar broker validates the authentication credentials when a connection is established. After the initial connection is authenticated, the "principal" token is stored for authorization though the connection is not re-authenticated. The broker periodically checks the expiration status of every `ServerCnx` object. You can set the `authenticationRefreshCheckSeconds` on the broker to control the frequency to check the expiration status. By default, the `authenticationRefreshCheckSeconds [...]
+
+The broker supports learning whether a particular client supports authentication refreshing. If a client supports authentication refreshing and the credential is expired, the authentication provider calls the `refreshAuthentication` method to initiate the refreshing process. If a client does not support authentication refreshing and the credential is expired, the broker disconnects the client.
+
 You had better secure the service components in your Apache Pulsar deployment.
 
 ## Role tokens


[pulsar] 15/20: Add more detail information of retry errors (#7341)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a050d7809ed104e55cd472725a7b5f6601521678
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Mon Jul 6 15:36:02 2020 +0800

    Add more detail information of retry errors (#7341)
    
    ---
    
    *Motivation*
    
    Currently, when you connecting to a wrong address, the error
    log does not show the detail information to let you know why
    it failed. Add the failed reason to know more about the error.
    
    Before adding the error log:
    
    ```
    bin/pulsar-admin topics list public/default
    null
    
    Reason: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted.
    ```
    
    After adding the error log:
    
    ```
    bin/pulsar-admin topics list public/default
    null
    
    Reason: java.util.concurrent.CompletionException: org.apache.pulsar.client.admin.internal.http.AsyncHttpConnector$RetryException: Could not complete the operation. Number of retries has been exhausted.Connection refused: localhost/127.0.0.1:8081
    ```
    
    (cherry picked from commit f4547ecfe37c6752251699aadf8c19b380f42bc9)
---
 .../apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index eca1800..c0dc467 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -240,8 +240,9 @@ public class AsyncHttpConnector implements Connector {
                                             retries - 1);
                                 } else {
                                     resultFuture.completeExceptionally(
-                                            new RetryException("Could not complete the operation. Number of retries "
-                                            + "has been exhausted.", throwable));
+                                        new RetryException("Could not complete the operation. Number of retries "
+                                            + "has been exhausted. Failed reason: " + throwable.getMessage(),
+                                            throwable));
                                 }
                             }
                         } else {


[pulsar] 17/20: Fix bug related to managedLedger properties (#7357)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8c6121542e6ff28cb217782e49917e1817595576
Author: Hao Zhang <zh...@cmss.chinamobile.com>
AuthorDate: Mon Jun 29 02:19:37 2020 +0800

    Fix bug related to managedLedger properties (#7357)
    
    * Remove re-read from zk, and use the same mutex when update metadata.
    
    * Add setProperty(), deleteProperty() API and test ledger changed when add metadata.
    
    * Add AsyncSetProperty(), asyncDeleteProperty() API and add related unit tests.
    
    * Fix unit test.
    
    * Fix exception propagation.
    
    (cherry picked from commit a3a63a35c9ff406be05bcfd388f520d40580954e)
---
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |   6 +-
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  46 ++++++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 135 +++++++++++++--------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 110 +++++++++++++----
 4 files changed, 221 insertions(+), 76 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 0add10f..1ced748 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -139,9 +139,9 @@ public interface AsyncCallbacks {
         void offloadFailed(ManagedLedgerException exception, Object ctx);
     }
 
-    interface SetPropertiesCallback {
-        void setPropertiesComplete(Map<String, String> properties, Object ctx);
+    interface UpdatePropertiesCallback {
+        void updatePropertiesComplete(Map<String, String> properties, Object ctx);
 
-        void setPropertiesFailed(ManagedLedgerException exception, Object ctx);
+        void updatePropertiesFailed(ManagedLedgerException exception, Object ctx);
     }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 4f4b8ca..a8a4509 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -451,11 +451,51 @@ public interface ManagedLedger {
     Map<String, String> getProperties();
 
     /**
+     * Add key-value to propertiesMap.
+     *
+     * @param key key of property to add
+     * @param value value of property to add
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException;
+
+    /**
+     * Async add key-value to propertiesMap.
+     *
+     * @param key      key of property to add
+     * @param value    value of property to add
+     * @param callback a callback which will be supplied with the newest properties in managedLedger.
+     * @param ctx      a context object which will be passed to the callback on completion.
+     **/
+    void asyncSetProperty(String key, String value, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+
+    /**
+     * Delete the property by key.
+     *
+     * @param key key of property to delete
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
+     */
+    void deleteProperty(String key) throws InterruptedException, ManagedLedgerException;
+
+    /**
+     * Async delete the property by key.
+     *
+     * @param key      key of property to delete
+     * @param callback a callback which will be supplied with the newest properties in managedLedger.
+     * @param ctx      a context object which will be passed to the callback on completion.
+     */
+    void asyncDeleteProperty(String key, final AsyncCallbacks.UpdatePropertiesCallback callback, Object ctx);
+
+    /**
      * Update managed-ledger's properties.
      *
      * @param properties key-values of properties
+     * @throws InterruptedException
+     * @throws ManagedLedgerException
      */
-    void setProperties(Map<String, String> properties) throws InterruptedException;
+    void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException;
 
     /**
      * Async update managed-ledger's properties.
@@ -464,9 +504,9 @@ public interface ManagedLedger {
      * @param callback   a callback which will be supplied with the newest properties in managedLedger.
      * @param ctx        a context object which will be passed to the callback on completion.
      */
-    void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.SetPropertiesCallback callback,
+    void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.UpdatePropertiesCallback callback,
         Object ctx);
-  
+
     /**
      * Trim consumed ledgers in background
      * @param promise
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 27d8848..f45b341 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.ImmutableMap;
@@ -31,12 +30,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Range;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
-
 import java.time.Clock;
 import java.util.Collections;
 import java.util.HashMap;
@@ -67,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -90,8 +86,8 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.SetPropertiesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.UpdatePropertiesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -176,10 +172,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private ScheduledFuture<?> timeoutTask;
 
     /**
-     * This lock is held while the ledgers list is updated asynchronously on the metadata store. Since we use the store
+     * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
      * version, we cannot have multiple concurrent updates.
      */
-    private final CallbackMutex ledgersListMutex = new CallbackMutex();
+    private final CallbackMutex metadataMutex = new CallbackMutex();
     private final CallbackMutex trimmerMutex = new CallbackMutex();
 
     private final CallbackMutex offloadMutex = new CallbackMutex();
@@ -1230,7 +1226,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
                     }
                     ledgersStat = stat;
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     updateLedgersIdsComplete(stat);
                     synchronized (ManagedLedgerImpl.this) {
                         mbean.addLedgerSwitchLatencySample(System.nanoTime() - lastLedgerCreationInitiationTimestamp,
@@ -1267,7 +1263,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                         }
                     }, null);
 
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
 
                     synchronized (ManagedLedgerImpl.this) {
                         lastLedgerCreationFailureTimestamp = clock.millis();
@@ -1282,7 +1278,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {
-        if (!ledgersListMutex.tryLock()) {
+        if (!metadataMutex.tryLock()) {
             // Defer update for later
             scheduledExecutor.schedule(() -> updateLedgersListAfterRollover(callback), 100, TimeUnit.MILLISECONDS);
             return;
@@ -2062,7 +2058,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }
 
             if (STATE_UPDATER.get(this) == State.CreatingLedger // Give up now and schedule a new trimming
-                    || !ledgersListMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
+                    || !metadataMutex.tryLock()) { // Avoid deadlocks with other operations updating the ledgers list
                 scheduleDeferredTrimming(promise);
                 trimmerMutex.unlock();
                 return;
@@ -2101,7 +2097,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     log.info("[{}] End TrimConsumedLedgers. ledgers={} totalSize={}", name, ledgers.size(),
                             TOTAL_SIZE_UPDATER.get(ManagedLedgerImpl.this));
                     ledgersStat = stat;
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     trimmerMutex.unlock();
 
                     for (LedgerInfo ls : ledgersToDelete) {
@@ -2119,7 +2115,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 @Override
                 public void operationFailed(MetaStoreException e) {
                     log.warn("[{}] Failed to update the list of ledgers after trimming", name, e);
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     trimmerMutex.unlock();
 
                     promise.completeExceptionally(e);
@@ -2531,7 +2527,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation,
             CompletableFuture<Void> finalPromise) {
         synchronized (this) {
-            if (!ledgersListMutex.tryLock()) {
+            if (!metadataMutex.tryLock()) {
                 // retry in 100 milliseconds
                 scheduledExecutor.schedule(
                         safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100,
@@ -2539,7 +2535,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             } else { // lock acquired
                 CompletableFuture<Void> unlockingPromise = new CompletableFuture<>();
                 unlockingPromise.whenComplete((res, ex) -> {
-                    ledgersListMutex.unlock();
+                    metadataMutex.unlock();
                     if (ex != null) {
                         finalPromise.completeExceptionally(ex);
                     } else {
@@ -3020,6 +3016,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId())
                     .setEntryId(lastConfirmedEntry.getEntryId()));
         }
+        for (Map.Entry<String, String> property : propertiesMap.entrySet()) {
+            mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder()
+                    .setKey(property.getKey()).setValue(property.getValue()));
+        }
 
         return mlInfo.build();
     }
@@ -3271,57 +3271,96 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     @Override
-    public void setProperties(Map<String, String> properties) throws InterruptedException {
+    public void setProperty(String key, String value) throws InterruptedException, ManagedLedgerException {
+        Map<String, String> map = new HashMap<>();
+        map.put(key, value);
+        updateProperties(map, false, null);
+    }
+
+    @Override
+    public void asyncSetProperty(String key, String value, final UpdatePropertiesCallback callback, Object ctx) {
+        Map<String, String> map = new HashMap<>();
+        map.put(key, value);
+        asyncUpdateProperties(map, false, null, callback, ctx);
+    }
+
+    @Override
+    public void deleteProperty(String key) throws InterruptedException, ManagedLedgerException {
+        updateProperties(null, true, key);
+    }
+
+    @Override
+    public void asyncDeleteProperty(String key, final UpdatePropertiesCallback callback, Object ctx) {
+        asyncUpdateProperties(null, true, key, callback, ctx);
+    }
+
+    @Override
+    public void setProperties(Map<String, String> properties) throws InterruptedException, ManagedLedgerException {
+        updateProperties(properties, false, null);
+    }
+
+    @Override
+    public void asyncSetProperties(Map<String, String> properties, final UpdatePropertiesCallback callback,
+        Object ctx) {
+        asyncUpdateProperties(properties, false, null, callback, ctx);
+    }
+
+    private void updateProperties(Map<String, String> properties, boolean isDelete,
+        String deleteKey) throws InterruptedException, ManagedLedgerException {
         final CountDownLatch latch = new CountDownLatch(1);
-        this.asyncSetProperties(properties, new SetPropertiesCallback() {
+        class Result {
+            ManagedLedgerException exception = null;
+        }
+        final Result result = new Result();
+        this.asyncUpdateProperties(properties, isDelete, deleteKey, new UpdatePropertiesCallback() {
             @Override
-            public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
                 latch.countDown();
             }
 
             @Override
-            public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
-                log.error("[{}] Update manageLedger's info failed:{}", name, exception.getMessage());
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                result.exception = exception;
                 latch.countDown();
             }
         }, null);
 
-        latch.await();
+        if (!latch.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) {
+            throw new ManagedLedgerException("Timeout during update managedLedger's properties");
+        }
+
+        if (result.exception != null) {
+            log.error("[{}] Update managedLedger's properties failed", name, result.exception);
+            throw result.exception;
+        }
     }
 
-    @Override
-    public void asyncSetProperties(Map<String, String> properties, final SetPropertiesCallback callback, Object ctx) {
-        store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
+    private void asyncUpdateProperties(Map<String, String> properties, boolean isDelete,
+        String deleteKey, final UpdatePropertiesCallback callback, Object ctx) {
+        if (!metadataMutex.tryLock()) {
+            // Defer update for later
+            scheduledExecutor.schedule(() -> asyncUpdateProperties(properties, isDelete, deleteKey,
+                callback, ctx), 100, TimeUnit.MILLISECONDS);
+            return;
+        }
+        if (isDelete) {
+            propertiesMap.remove(deleteKey);
+        } else {
+            propertiesMap.putAll(properties);
+        }
+        store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStoreCallback<Void>() {
             @Override
-            public void operationComplete(ManagedLedgerInfo result, Stat version) {
+            public void operationComplete(Void result, Stat version) {
                 ledgersStat = version;
-                // Update manageLedger's  properties.
-                ManagedLedgerInfo.Builder info = ManagedLedgerInfo.newBuilder(result);
-                info.clearProperties();
-                for (Map.Entry<String, String> property : properties.entrySet()) {
-                    info.addProperties(MLDataFormats.KeyValue.newBuilder().setKey(property.getKey()).setValue(property.getValue()));
-                }
-                store.asyncUpdateLedgerIds(name, info.build(), version, new MetaStoreCallback<Void>() {
-                    @Override
-                    public void operationComplete(Void result, Stat version) {
-                        ledgersStat = version;
-                        propertiesMap.clear();
-                        propertiesMap.putAll(properties);
-                        callback.setPropertiesComplete(properties, ctx);
-                    }
-
-                    @Override
-                    public void operationFailed(MetaStoreException e) {
-                        log.error("[{}] Update manageLedger's info failed:{}", name, e.getMessage());
-                        callback.setPropertiesFailed(e, ctx);
-                    }
-                });
+                callback.updatePropertiesComplete(propertiesMap, ctx);
+                metadataMutex.unlock();
             }
 
             @Override
             public void operationFailed(MetaStoreException e) {
-                log.error("[{}] Get manageLedger's info failed:{}", name, e.getMessage());
-                callback.setPropertiesFailed(e, ctx);
+                log.error("[{}] Update managedLedger's properties failed", name, e);
+                callback.updatePropertiesFailed(e, ctx);
+                metadataMutex.unlock();
             }
         });
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e507c99..2806a10 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -63,7 +64,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
-import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -95,7 +95,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -1168,7 +1167,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
-    public void testSetProperties() throws Exception {
+    public void testUpdateProperties() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger");
         Map<String, String> properties = new HashMap<>();
         properties.put("key1", "value1");
@@ -1177,40 +1176,107 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.setProperties(properties);
         assertEquals(ledger.getProperties(), properties);
 
+        properties.put("key4", "value4");
+        ledger.setProperty("key4", "value4");
+        assertEquals(ledger.getProperties(), properties);
+
+        ledger.deleteProperty("key4");
+        properties.remove("key4");
+        assertEquals(ledger.getProperties(), properties);
+
         Map<String, String> newProperties = new HashMap<>();
-        newProperties.put("key4", "value4");
         newProperties.put("key5", "value5");
-        newProperties.put("key6", "value6");
+        newProperties.put("key1", "value6");
+        newProperties.putAll(properties);
         ledger.setProperties(newProperties);
         assertEquals(ledger.getProperties(), newProperties);
     }
 
     @Test
-    public void testAsyncSetProperties() throws Exception {
-        final CountDownLatch latch = new CountDownLatch(1);
+    public void testAsyncUpdateProperties() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(3);
         ManagedLedger ledger = factory.open("my_test_ledger");
-        Map<String, String> properties = new HashMap<>();
-        properties.put("key1", "value1");
-        properties.put("key2", "value2");
-        properties.put("key3", "value3");
-        ledger.setProperties(properties);
-        Map<String, String> newProperties = new HashMap<>();
-        newProperties.put("key4", "value4");
-        newProperties.put("key5", "value5");
-        newProperties.put("key6", "value6");
-        ledger.asyncSetProperties(newProperties, new AsyncCallbacks.SetPropertiesCallback() {
+        Map<String, String> prop = new HashMap<>();
+        prop.put("key1", "value1");
+        prop.put("key2", "value2");
+        prop.put("key3", "value3");
+        ledger.asyncSetProperties(prop, new AsyncCallbacks.UpdatePropertiesCallback() {
             @Override
-            public void setPropertiesComplete(Map<String, String> properties, Object ctx) {
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                assertEquals(prop, properties);
                 latch.countDown();
             }
 
             @Override
-            public void setPropertiesFailed(ManagedLedgerException exception, Object ctx) {
-                fail("should have succeeded");
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
             }
         }, null);
-        latch.await();
-        assertEquals(ledger.getProperties(), newProperties);
+
+        ledger.asyncSetProperty("key4", "value4", new AsyncCallbacks.UpdatePropertiesCallback() {
+            @Override
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                assertNotNull(properties.get("key4"));
+                assertEquals("value4", properties.get("key4"));
+                latch.countDown();
+            }
+
+            @Override
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+            }
+        }, null);
+
+        prop.remove("key1");
+        ledger.asyncDeleteProperty("key1", new AsyncCallbacks.UpdatePropertiesCallback() {
+            @Override
+            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                assertNull(properties.get("key1"));
+                latch.countDown();
+            }
+
+            @Override
+            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+            }
+        }, null);
+        assertTrue(latch.await(60, TimeUnit.SECONDS));
+    }
+
+    @Test
+    public void testConcurrentAsyncSetProperties() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1000);
+        ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i = 0; i < 1000; i++) {
+            final int finalI = i;
+            executor.execute(() -> {
+                Map<String, String> newProperties = new HashMap<>();
+                newProperties.put("key0", String.valueOf(finalI));
+                newProperties.put("key1", "value1");
+                newProperties.put("key2", "value2");
+                newProperties.put("key3", "value3");
+                ledger.asyncSetProperties(newProperties, new AsyncCallbacks.UpdatePropertiesCallback() {
+                    @Override
+                    public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
+                        assertEquals(properties, newProperties);
+                        latch.countDown();
+                    }
+
+                    @Override
+                    public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
+                    }
+                }, null);
+            });
+        }
+        try {
+            for (int i = 0; i < 100; i++) {
+                ledger.addEntry("data".getBytes(Encoding));
+                Thread.sleep(300);
+            }
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+        assertTrue(latch.await(300, TimeUnit.SECONDS));
+        executor.shutdown();
+        factory.shutdown();
     }
 
     @Test


[pulsar] 16/20: Handling error in creation of non-durable cursor (#7355)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4b103cd9c742392a16f815c6b709dc9491882c13
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 21:11:40 2020 -0700

    Handling error in creation of non-durable cursor (#7355)
    
    * Handling error in creation of non-durable cursor
    
    * Fixed tests
    
    (cherry picked from commit 0a5f0a065e76fd6dff6243441d2cfbf25f0ee3dc)
---
 .../broker/service/persistent/PersistentTopic.java | 57 +++++++++++-----------
 1 file changed, 29 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d7fcd41..ca7fbd4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -147,7 +147,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
 
     private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-    
+
     // topic has every published chunked message since topic is loaded
     public boolean msgChunkPublished;
 
@@ -688,12 +688,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
     private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
             MessageId startMessageId, long startMessageRollbackDurationSec) {
-        CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
 
         synchronized (ledger) {
             // Create a new non-durable cursor only for the first consumer that connects
-            Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> {
+            PersistentSubscription subscription = subscriptions.get(subscriptionName);
+
+            if (subscription == null) {
                 MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
                         : (MessageIdImpl) MessageId.latest;
 
@@ -702,7 +703,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 // Ensure that the start message id starts from a valid entry.
                 if (ledgerId >= 0 && entryId >= 0
                         && msgId instanceof BatchMessageIdImpl) {
-                    // When the start message is relative to a batch, we need to take one step back on the previous message,
+                    // When the start message is relative to a batch, we need to take one step back on the previous
+                    // message,
                     // because the "batch" might not have been consumed in its entirety.
                     // The client will then be able to discard the first messages if needed.
                     entryId = msgId.getEntryId() - 1;
@@ -713,32 +715,31 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 try {
                     cursor = ledger.newNonDurableCursor(startPosition, subscriptionName);
                 } catch (ManagedLedgerException e) {
-                    subscriptionFuture.completeExceptionally(e);
+                    return FutureUtil.failedFuture(e);
                 }
-            return new PersistentSubscription(this, subscriptionName, cursor, false);
-            });
 
-            if (!subscriptionFuture.isDone()) {
-                if (startMessageRollbackDurationSec > 0) {
-                    long timestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
-                    subscription.resetCursor(timestamp).handle((s, ex) -> {
-                        if (ex != null) {
-                            log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
-                                    startMessageRollbackDurationSec);
-                        }
-                        subscriptionFuture.complete(subscription);
-                        return null;
-                    });
-                } else {
-                    subscriptionFuture.complete(subscription);
-                }
+                subscription = new PersistentSubscription(this, subscriptionName, cursor, false);
+                subscriptions.put(subscriptionName, subscription);
+            }
+
+            if (startMessageRollbackDurationSec > 0) {
+                long timestamp = System.currentTimeMillis()
+                        - TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
+                CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
+                final Subscription finalSubscription = subscription;
+                subscription.resetCursor(timestamp).handle((s, ex) -> {
+                    if (ex != null) {
+                        log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
+                                startMessageRollbackDurationSec);
+                    }
+                    subscriptionFuture.complete(finalSubscription);
+                    return null;
+                });
+                return subscriptionFuture;
             } else {
-                // failed to initialize managed-cursor: clean up created subscription
-                subscriptions.remove(subscriptionName);
+                return CompletableFuture.completedFuture(subscription);
             }
         }
-
-        return subscriptionFuture;
     }
 
     @Override
@@ -936,7 +937,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     public CompletableFuture<Void> close() {
         return close(false);
     }
-    
+
     /**
      * Close this topic - close all producers and subscriptions associated with this topic
      *
@@ -967,7 +968,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
         producers.values().forEach(producer -> futures.add(producer.disconnect()));
         subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
-        
+
         CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
                 : FutureUtil.waitForAll(futures);
 
@@ -1794,7 +1795,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
 
         initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
-        
+
         this.updateMaxPublishRate(data);
 
         producers.values().forEach(producer -> {


[pulsar] 02/20: Fix hash range conflict issue in Key_Shared with sticky hash range (#7231)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1593dfad5ce1b9520c956fefa86fd9e2037302b1
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 11 08:43:54 2020 +0800

    Fix hash range conflict issue in Key_Shared with sticky hash range (#7231)
    
    Fix hash range conflict issue in Key_Shared with sticky hash range
    
    (cherry picked from commit e1c04ef2026c5f6a9a7229a05cedee3ff9dbf7ee)
---
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |  9 ++-
 .../client/api/KeySharedSubscriptionTest.java      | 64 ++++++++++++++++++++++
 .../pulsar/client/api/PulsarClientException.java   | 19 +++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 +
 4 files changed, 93 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index dc96fbb..0276f42 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -106,7 +106,14 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
             }
 
             if (ceilingEntry != null && floorEntry != null && ceilingEntry.getValue().equals(floorEntry.getValue())) {
-                throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
+                PulsarApi.KeySharedMeta keySharedMeta = ceilingEntry.getValue().getKeySharedMeta();
+                for (PulsarApi.IntRange range : keySharedMeta.getHashRangesList()) {
+                    int start = Math.max(intRange.getStart(), range.getStart());
+                    int end = Math.min(intRange.getEnd(), range.getEnd());
+                    if (end >= start) {
+                        throw new BrokerServiceException.ConsumerAssignException("Range conflict with consumer " + ceilingEntry.getValue());
+                    }
+                }
             }
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 8c6c033..511f8c2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -655,6 +655,70 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testHashRangeConflict() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        final String topic = "testHashRangeConflict-" + UUID.randomUUID().toString();
+        final String sub = "test";
+
+        Consumer<String> consumer1 = createFixedHashRangesConsumer(topic, sub, Range.of(0,99), Range.of(400, 65535));
+        Assert.assertTrue(consumer1.isConnected());
+
+        Consumer<String> consumer2 = createFixedHashRangesConsumer(topic, sub, Range.of(100,399));
+        Assert.assertTrue(consumer2.isConnected());
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(1,1));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        consumer1.close();
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(0, 65535));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(50,100));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        try {
+            createFixedHashRangesConsumer(topic, sub, Range.of(399,500));
+            Assert.fail("Should failed with conflict range.");
+        } catch (PulsarClientException.ConsumerAssignException ignore) {
+        }
+
+        Consumer<String> consumer3 = createFixedHashRangesConsumer(topic, sub, Range.of(400,600));
+        Assert.assertTrue(consumer3.isConnected());
+
+        Consumer<String> consumer4 = createFixedHashRangesConsumer(topic, sub, Range.of(50,99));
+        Assert.assertTrue(consumer4.isConnected());
+
+        consumer2.close();
+        consumer3.close();
+        consumer4.close();
+    }
+
+    private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
+        return pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscriptionName(subscription)
+                .keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(ranges))
+                .subscribe();
+    }
+
     private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
         Producer<Integer> producer = null;
         if (enableBatch) {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b6e327c..6a6e42a 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -732,6 +732,20 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    /**
+     * Consumer assign exception thrown by Pulsar client.
+     */
+    public static class ConsumerAssignException extends PulsarClientException {
+
+        /**
+         * Constructs an {@code ConsumerAssignException} with the specified detail message.
+         * @param msg The detail message.
+         */
+        public ConsumerAssignException(String msg) {
+            super(msg);
+        }
+    }
+
     // wrap an exception to enriching more info messages.
     public static Throwable wrap(Throwable t, String msg) {
         msg += "\n" + t.getMessage();
@@ -786,6 +800,8 @@ public class PulsarClientException extends IOException {
             return new ChecksumException(msg);
         } else if (t instanceof CryptoException) {
             return new CryptoException(msg);
+        } else if (t instanceof ConsumerAssignException) {
+            return new ConsumerAssignException(msg);
         } else if (t instanceof PulsarClientException) {
             return new PulsarClientException(msg);
         } else if (t instanceof CompletionException) {
@@ -867,6 +883,8 @@ public class PulsarClientException extends IOException {
             return new ChecksumException(msg);
         } else if (cause instanceof CryptoException) {
             return new CryptoException(msg);
+        } else if (cause instanceof ConsumerAssignException) {
+            return new ConsumerAssignException(msg);
         } else if (cause instanceof TopicDoesNotExistException) {
             return new TopicDoesNotExistException(msg);
         } else {
@@ -895,6 +913,7 @@ public class PulsarClientException extends IOException {
                 || t instanceof NotSupportedException
                 || t instanceof ChecksumException
                 || t instanceof CryptoException
+                || t instanceof ConsumerAssignException
                 || t instanceof ProducerBusyException
                 || t instanceof ConsumerBusyException) {
             return false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 6dde157..d63cbc1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1001,6 +1001,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.IncompatibleSchemaException(errorMsg);
         case TopicNotFound:
             return new PulsarClientException.TopicDoesNotExistException(errorMsg);
+        case ConsumerAssignError:
+            return new PulsarClientException.ConsumerAssignException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);


[pulsar] 18/20: Use fully qualified hostname as default to advertise worker. (#7360)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b48120f59d50d2fef0a7551c1d7d74745e45453a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 25 20:29:06 2020 -0700

    Use fully qualified hostname as default to advertise worker. (#7360)
    
    ### Motivation
    
    Similar to #6235, we need to ensure to always use the fully qualified  hostname
    
    (cherry picked from commit bafb3732b9858c06023113d6662aabafc5910319)
---
 .../src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index c1280ca..bb8d896 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -493,7 +493,8 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
 
     public static String unsafeLocalhostResolve() {
         try {
-            return InetAddress.getLocalHost().getHostName();
+            // Get the fully qualified hostname
+            return InetAddress.getLocalHost().getCanonicalHostName();
         } catch (UnknownHostException ex) {
             throw new IllegalStateException("Failed to resolve localhost name.", ex);
         }


[pulsar] 08/20: typo (#7281)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3498f40519f072ce7176798fb9ff2bb307868893
Author: Alexandre DUVAL <ka...@gmail.com>
AuthorDate: Fri Jun 19 15:25:54 2020 +0200

    typo (#7281)
    
    
    (cherry picked from commit c3522503525379d0376ad3d1d42613abfc80e502)
---
 .../main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java   | 2 +-
 .../src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java    | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 42fd278..07ed1e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1457,7 +1457,7 @@ public abstract class NamespacesBase extends AdminResource {
     }
 
     protected void internalSetRetention(RetentionPolicies retention) {
-        validateNamespacePolicyOperation(namespaceName, PolicyName.REPLICATION, PolicyOperation.WRITE);
+        validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b018d42..366f0e8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -180,7 +180,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
 
         doThrow(new RestException(Status.UNAUTHORIZED, "unauthorized")).when(namespaces)
                 .validateNamespacePolicyOperation(NamespaceName.get("other-tenant/use/test-namespace-1"),
-                        PolicyName.REPLICATION, PolicyOperation.WRITE);
+                        PolicyName.RETENTION, PolicyOperation.WRITE);
 
         nsSvc = pulsar.getNamespaceService();
     }


[pulsar] 04/20: Update Jetty to 9.4.29 (#7235)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cbf5bb2fc45611acea3ccbf7c2cf655bd18612eb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 29 22:44:44 2020 -0700

    Update Jetty to 9.4.29 (#7235)
    
    * Update Jetty to 9.4.29
    
    * Fixed test exception message expectation
    
    * Fixed test
    
    * Fixed difference in extracting exception message
    
    (cherry picked from commit 8a49e46c9488a0d8ddd554ab0217f057fe5cceec)
---
 distribution/server/src/assemble/LICENSE.bin.txt   | 34 +++++++++++-----------
 pom.xml                                            |  2 +-
 .../pulsar/broker/admin/AdminApiSchemaTest.java    |  3 ++
 .../pulsar/broker/service/BrokerServiceTest.java   |  8 ++---
 .../pulsar/client/admin/PulsarAdminException.java  |  2 +-
 pulsar-sql/presto-distribution/LICENSE             | 12 ++++----
 6 files changed, 32 insertions(+), 29 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 2eac008..2c15d6e 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -414,23 +414,23 @@ The Apache Software License, Version 2.0
     - org.asynchttpclient-async-http-client-2.12.1.jar
     - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
  * Jetty
-    - org.eclipse.jetty-jetty-client-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-continuation-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-http-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-io-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-proxy-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-security-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-server-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-servlet-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-servlets-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-util-9.4.20.v20190813.jar
-    - org.eclipse.jetty-jetty-xml-9.4.20.v20190813.jar
-    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.20.v20190813.jar
-    - org.eclipse.jetty.websocket-websocket-api-9.4.20.v20190813.jar
-    - org.eclipse.jetty.websocket-websocket-client-9.4.20.v20190813.jar
-    - org.eclipse.jetty.websocket-websocket-common-9.4.20.v20190813.jar
-    - org.eclipse.jetty.websocket-websocket-server-9.4.20.v20190813.jar
-    - org.eclipse.jetty.websocket-websocket-servlet-9.4.20.v20190813.jar
+    - org.eclipse.jetty-jetty-client-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-continuation-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-http-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-io-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-proxy-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-security-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-server-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-servlet-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-servlets-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-util-9.4.29.v20200521.jar
+    - org.eclipse.jetty-jetty-xml-9.4.29.v20200521.jar
+    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.29.v20200521.jar
+    - org.eclipse.jetty.websocket-websocket-api-9.4.29.v20200521.jar
+    - org.eclipse.jetty.websocket-websocket-client-9.4.29.v20200521.jar
+    - org.eclipse.jetty.websocket-websocket-common-9.4.29.v20200521.jar
+    - org.eclipse.jetty.websocket-websocket-server-9.4.29.v20200521.jar
+    - org.eclipse.jetty.websocket-websocket-servlet-9.4.29.v20200521.jar
  * SnakeYaml -- org.yaml-snakeyaml-1.24.jar
  * RocksDB - org.rocksdb-rocksdbjni-5.13.3.jar
  * HttpClient
diff --git a/pom.xml b/pom.xml
index 819b0b8..bac783a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@ flexible messaging model and an intuitive client API.</description>
     <netty.version>4.1.48.Final</netty.version>
     <netty-tc-native.version>2.0.30.Final</netty-tc-native.version>
     <storm.version>2.0.0</storm.version>
-    <jetty.version>9.4.20.v20190813</jetty.version>
+    <jetty.version>9.4.29.v20200521</jetty.version>
     <jersey.version>2.27</jersey.version>
     <athenz.version>1.8.38</athenz.version>
     <prometheus.version>0.5.0</prometheus.version>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index 5058b82..9cc8105 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin;
 import static java.nio.charset.StandardCharsets.US_ASCII;
 import static org.junit.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 import com.google.common.collect.Sets;
 import java.util.HashMap;
@@ -162,6 +163,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
 
         try {
             admin.schemas().createSchema(topicName, foo1SchemaInfo);
+            fail("Should have failed");
         } catch (PulsarAdminException.ConflictException e) {
             assertTrue(e.getMessage().contains("HTTP 409 Conflict"));
         }
@@ -171,6 +173,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
 
         try {
             admin.schemas().createSchema(topicName, fooSchemaInfo);
+            fail("Should have failed");
         } catch (PulsarAdminException.NotFoundException e) {
             assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 282f8ac..241072f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -574,7 +574,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
             fail("should fail");
         } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Authentication required"));
+            assertTrue(e.getMessage().contains("Unauthorized"));
         } finally {
             pulsarClient.close();
         }
@@ -636,7 +636,7 @@ public class BrokerServiceTest extends BrokerTestBase {
 
             fail("should fail");
         } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Authentication required"));
+            assertTrue(e.getMessage().contains("Unauthorized"));
         } finally {
             pulsarClient.close();
         }
@@ -654,7 +654,7 @@ public class BrokerServiceTest extends BrokerTestBase {
                     .subscribe();
             fail("should fail");
         } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Authentication required"));
+            assertTrue(e.getMessage().contains("Unauthorized"));
         } finally {
             pulsarClient.close();
         }
@@ -697,7 +697,7 @@ public class BrokerServiceTest extends BrokerTestBase {
                     .subscribe();
             fail("should fail");
         } catch (Exception e) {
-            assertTrue(e.getMessage().contains("Authentication required"));
+            assertTrue(e.getMessage().contains("Unauthorized"));
         } finally {
             pulsarClient.close();
         }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
index 4e79cfc..fa8dbf2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
@@ -40,7 +40,7 @@ public class PulsarAdminException extends Exception {
 
     private static String getReasonFromServer(WebApplicationException e) {
         try {
-            return e.getResponse().readEntity(ErrorData.class).reason;
+            return e.getResponse().readEntity(ErrorData.class).reason.toString();
         } catch (Exception ex) {
             try {
                 return ObjectMapperFactory.getThreadLocal().readValue(
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index e4d7f3a..9910f64 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -434,12 +434,12 @@ The Apache Software License, Version 2.0
   * Java Assist
     - javassist-3.25.0-GA.jar
   * Jetty
-    - jetty-http-9.4.20.v20190813.jar
-    - jetty-io-9.4.20.v20190813.jar
-    - jetty-security-9.4.20.v20190813.jar
-    - jetty-server-9.4.20.v20190813.jar
-    - jetty-servlet-9.4.20.v20190813.jar
-    - jetty-util-9.4.20.v20190813.jar
+    - jetty-http-9.4.29.v20200521.jar
+    - jetty-io-9.4.29.v20200521.jar
+    - jetty-security-9.4.29.v20200521.jar
+    - jetty-server-9.4.29.v20200521.jar
+    - jetty-servlet-9.4.29.v20200521.jar
+    - jetty-util-9.4.29.v20200521.jar
   * Java Native Access
     - jna-4.2.0.jar
   * Yahoo Datasketches


[pulsar] 03/20: Fix lookup permission error (#7234)

Posted by rx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e752e0f805cf2e8d2915c7dc77d8feb987343880
Author: ltamber <lt...@gmail.com>
AuthorDate: Thu Jun 11 15:59:22 2020 +0800

    Fix lookup permission error (#7234)
    
    ### Motivation
    Currently,when pulsar AuthorizationService check lookup permission, if the role canProducer **or** canConsumer mean that canLookup, but actually in the code https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java#L267, if the method canProduce or canConsume throw exception, `canLookup` will just throw the exception and won't check the other permission.
    
    ### Modification
    invoke `canLookupAsync` instead.
    
    (cherry picked from commit 834e2cb78e6354a8e74146f599b20ab2c75af5d9)
---
 .../pulsar/broker/authorization/AuthorizationService.java  | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index 0ced2bf..10b35ef 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -264,8 +264,18 @@ public class AuthorizationService {
      */
     public boolean canLookup(TopicName topicName, String role, AuthenticationDataSource authenticationData)
             throws Exception {
-        return canProduce(topicName, role, authenticationData)
-                || canConsume(topicName, role, authenticationData, null);
+        try {
+            return canLookupAsync(topicName, role, authenticationData)
+                    .get(conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Time-out {} sec while checking authorization on {} ", conf.getZooKeeperOperationTimeoutSeconds(),
+                    topicName);
+            throw e;
+        } catch (Exception e) {
+            log.warn("Role - {} failed to get lookup permissions for topic - {}. {}", role, topicName,
+                    e.getMessage());
+            throw e;
+        }
     }
 
     /**