You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/08/10 15:16:57 UTC

[pulsar] branch branch-2.9 updated (2c6db60abee -> e434a531a11)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 2c6db60abee [improve][authentication] Support for get token from HTTP params (#16987)
     new ba24b40a1f9 Exclude the Netty Reactive Stream from asynchttpclient (#16312)
     new 0cc42d9130d [fix][authorization] Fix multiple roles authorization (#16645)
     new 6bdffab17db add artifactSet to pom.xml for pulsar-functions-local-runner (#16565)
     new 4bc4659eadc fix lafla source config when consumerConfigProperties='' (#16731)
     new 6f33cdea054 [fix][broker] Fix stats-internal with option -m cause active ledger recover then close (#16662)
     new fbfc6bdbad6 [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception (#16655)
     new dbfbea457e8 [fix][client] Fix load trust certificate (#16789)
     new 7b2f39cac55 [fix][proxy] Fix client service url (#16834)
     new 4fba95abb70 [Java Client] Send CloseConsumer on timeout (#16616)
     new 4f03d0056b4 [fix][function] Fix python instance not process zip file correctly (#16697)
     new 4c00702e5b0 [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)
     new e434a531a11 [fix][broker]remove exception log when access status.html (#17025)

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  17 +-
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  17 ++
 pom.xml                                            |   4 +
 .../MultiRolesTokenAuthorizationProvider.java      |  86 ++++++--
 .../authorization/PulsarAuthorizationProvider.java |   3 +-
 .../pulsar/common/configuration/VipStatus.java     |   4 +-
 .../client/api/ClientAuthenticationTlsTest.java    | 183 ++++++++++++++++
 .../apache/pulsar/client/api/ClientErrorsTest.java |  47 ++++-
 .../MultiRolesTokenAuthorizationProviderTest.java  | 231 +++++++++++++++++++++
 .../pulsar/client/api/ProducerConsumerBase.java    |  21 +-
 .../apache/pulsar/client/api/RetryTopicTest.java   |  60 ++++++
 pulsar-client-tools/pom.xml                        |   4 +
 pulsar-client/pom.xml                              |   5 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  88 ++++----
 .../apache/pulsar/common/util/SecurityUtility.java |   7 +-
 .../src/main/python/python_instance_main.py        |  10 +-
 pulsar-functions/localrun-shaded/pom.xml           |  66 +++++-
 .../apache/pulsar/io/kafka/KafkaSourceConfig.java  |   2 +
 .../io/kafka/source/KafkaAbstractSourceTest.java   |  19 ++
 .../pulsar/proxy/server/ProxyConnection.java       |   4 +-
 .../pulsar/proxy/server/ProxyConnectionTest.java   |  24 +++
 site2/docs/developing-binary-protocol.md           |   9 +
 22 files changed, 836 insertions(+), 75 deletions(-)
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java


[pulsar] 05/12: [fix][broker] Fix stats-internal with option -m cause active ledger recover then close (#16662)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6f33cdea0543a00412a0b5fb1fa3ea4e3c80bb7d
Author: yapeng <56...@qq.com>
AuthorDate: Wed Jul 27 10:01:33 2022 +0800

    [fix][broker] Fix stats-internal with option -m cause active ledger recover then close (#16662)
    
    (cherry picked from commit b3bced282d647a47f9fd60db35390cc6348d25dc)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java      | 17 +++++++++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java      | 17 +++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 0f879b057a9..28232f31b17 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1726,7 +1726,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     }
 
     public CompletableFuture<String> getLedgerMetadata(long ledgerId) {
-        return getLedgerHandle(ledgerId).thenApply(rh -> rh.getLedgerMetadata().toSafeString());
+        LedgerHandle currentLedger = this.currentLedger;
+        if (currentLedger != null && ledgerId == currentLedger.getId()) {
+            return CompletableFuture.completedFuture(currentLedger.getLedgerMetadata().toSafeString());
+        } else {
+            return getLedgerHandle(ledgerId).thenApply(rh -> rh.getLedgerMetadata().toSafeString());
+        }
     }
 
     @Override
@@ -4006,7 +4011,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             return CompletableFuture.completedFuture(Collections.emptySet());
         }
 
-        return getLedgerHandle(ledgerId).thenCompose(lh -> {
+        CompletableFuture<ReadHandle> ledgerHandleFuture;
+        LedgerHandle currentLedger = this.currentLedger;
+        if (currentLedger != null && ledgerId == currentLedger.getId()) {
+            ledgerHandleFuture = CompletableFuture.completedFuture(currentLedger);
+        } else {
+            ledgerHandleFuture = getLedgerHandle(ledgerId);
+        }
+
+        return ledgerHandleFuture.thenCompose(lh -> {
             Set<BookieId> ensembles = new HashSet<>();
             lh.getLedgerMetadata().getAllEnsembles().values().forEach(ensembles::addAll);
             return CompletableFuture.completedFuture(ensembles);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 317fb7e2b30..ae835dccb3f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3519,4 +3519,21 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
         });
     }
+
+    @Test
+    public void testGetLedgerMetadata() throws Exception {
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("testGetLedgerMetadata");
+        long lastLedger = managedLedger.ledgers.lastEntry().getKey();
+        managedLedger.getLedgerMetadata(lastLedger);
+        Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
+    }
+
+    @Test
+    public void testGetEnsemblesAsync() throws Exception {
+        // test getEnsemblesAsync of latest ledger will not open it twice and put it in ledgerCache.
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("testGetLedgerMetadata");
+        long lastLedger = managedLedger.ledgers.lastEntry().getKey();
+        managedLedger.getEnsemblesAsync(lastLedger).join();
+        Assert.assertFalse(managedLedger.ledgerCache.containsKey(lastLedger));
+    }
 }


[pulsar] 11/12: [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4c00702e5b0a196b3867ed876152edf972e59242
Author: Jiaqi Shen <50...@users.noreply.github.com>
AuthorDate: Fri Aug 5 15:26:01 2022 +0800

    [fix][client] Remove redundant check for chunked message TotalChunkMsgSize in ConsumerImpl (#16797)
    
    ### Motivation
    
    There is a incorrect out-of-order check for chunked message in `ConsumerImpl`. For the last check should compare the result of `chunkedMsgCtx.chunkedMsgBuffer.readableBytes() + compressedPayload.readableBytes()` with `TotalChunkMsgSize` instead of `ChunkId` with `TotalChunkMsgSize`.
    
    ### Modifications
    
    Fix the out-of-order check for chunked message in `ConsumerImpl`.
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 0079492be07..04865d39995 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1282,12 +1282,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
         // discard message if chunk is out-of-order
         if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
-                || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)
-                || msgMetadata.getChunkId() >= msgMetadata.getTotalChunkMsgSize()) {
+                || msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) {
             // means we lost the first chunk: should never happen
-            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}, total-chunks {}", msgId,
-                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId(),
-                    msgMetadata.getTotalChunkMsgSize());
+            log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId,
+                    (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
             if (chunkedMsgCtx != null) {
                 if (chunkedMsgCtx.chunkedMsgBuffer != null) {
                     ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);


[pulsar] 09/12: [Java Client] Send CloseConsumer on timeout (#16616)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4fba95abb701de186235ff6115955acb6e389a64
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Sat Jul 30 10:31:30 2022 +0800

    [Java Client] Send CloseConsumer on timeout (#16616)
    
    (cherry picked from commit 8f316558e2b3204cd197cd61f7173d64987fc918)
---
 .../apache/pulsar/client/api/ClientErrorsTest.java | 47 ++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 69 +++++++++++++---------
 site2/docs/developing-binary-protocol.md           |  9 +++
 3 files changed, 95 insertions(+), 30 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index d7507d2b47c..d98f0d57da0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -240,7 +240,7 @@ public class ClientErrorsTest {
         });
 
         // Create producer should succeed then upon closure, it should reattempt creation. The first request will
-        // timeout, which triggers CloseProducer. The client might send send the third Producer command before the
+        // time out, which triggers CloseProducer. The client might send the third Producer command before the
         // below assertion, so we pass with 2 or 3.
         client.newProducer().topic(topic).create();
         Awaitility.await().until(() -> closeProducerCounter.get() == 1);
@@ -249,6 +249,51 @@ public class ClientErrorsTest {
         mockBrokerService.resetHandleCloseProducer();
     }
 
+    @Test
+    public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+        consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
+    }
+
+    @Test
+    public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+        consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
+    }
+
+    private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
+                .operationTimeout(1, TimeUnit.SECONDS).build();
+        final AtomicInteger subscribeCounter = new AtomicInteger(0);
+        final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
+            int subscribeCount = subscribeCounter.incrementAndGet();
+            if (subscribeCount == 1) {
+                ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+                // Trigger reconnect
+                ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1));
+            } else if (subscribeCount != 2) {
+                // Respond to subsequent requests to prevent timeouts
+                ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+            }
+            // Don't respond to the second Subscribe command to ensure timeout
+        });
+
+        mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
+            closeConsumerCounter.incrementAndGet();
+            ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
+        });
+
+        // Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. The first
+        // request will time out, which triggers CloseConsumer. The client might send the third Subscribe command before
+        // the below assertion, so we pass with 2 or 3.
+        client.newConsumer().topic(topic).subscriptionName("test").subscribe();
+        Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
+        Awaitility.await().until(() -> subscribeCounter.get() == 2 || subscribeCounter.get() == 3);
+        mockBrokerService.resetHandleSubscribe();
+        mockBrokerService.resetHandleCloseConsumer();
+    }
+
     @Test
     public void testProducerFailDoesNotFailOtherProducer() throws Exception {
         producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ef06a2e71aa..0079492be07 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -792,35 +792,46 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             }
             log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());
 
-            if (e.getCause() instanceof PulsarClientException
-                    && PulsarClientException.isRetriableError(e.getCause())
-                    && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
-                reconnectLater(e.getCause());
-            } else if (!subscribeFuture.isDone()) {
-                // unable to create new consumer, fail operation
-                setState(State.Failed);
-                closeConsumerTasks();
-                subscribeFuture.completeExceptionally(
-                    PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s with subscription " +
-                        "name %s when connecting to the broker", topicName.toString(), subscription)));
-                client.cleanupConsumer(this);
-            } else if (e.getCause() instanceof TopicDoesNotExistException) {
-                // The topic was deleted after the consumer was created, and we're
-                // not allowed to recreate the topic. This can happen in few cases:
-                //  * Regex consumer getting error after topic gets deleted
-                //  * Regular consumer after topic is manually delete and with
-                //    auto-topic-creation set to false
-                // No more retries are needed in this case.
-                setState(State.Failed);
-                closeConsumerTasks();
-                client.cleanupConsumer(this);
-                log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", topic, subscription, cnx.channel().remoteAddress());
-            } else {
-                // consumer was subscribed and connected but we got some error, keep trying
-                reconnectLater(e.getCause());
-            }
-            return null;
-        });
+                if (e.getCause() instanceof PulsarClientException.TimeoutException) {
+                    // Creating the consumer has timed out. We need to ensure the broker closes the consumer
+                    // in case it was indeed created, otherwise it might prevent new create consumer operation,
+                    // since we are not necessarily closing the connection.
+                    long closeRequestId = client.newRequestId();
+                    ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId);
+                    cnx.sendRequestWithId(cmd, closeRequestId);
+                }
+
+                if (e.getCause() instanceof PulsarClientException
+                        && PulsarClientException.isRetriableError(e.getCause())
+                        && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
+                    reconnectLater(e.getCause());
+                } else if (!subscribeFuture.isDone()) {
+                    // unable to create new consumer, fail operation
+                    setState(State.Failed);
+                    closeConsumerTasks();
+                    subscribeFuture.completeExceptionally(
+                            PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s "
+                                            + "with subscription name %s when connecting to the broker",
+                                    topicName.toString(), subscription)));
+                    client.cleanupConsumer(this);
+                } else if (e.getCause() instanceof TopicDoesNotExistException) {
+                    // The topic was deleted after the consumer was created, and we're
+                    // not allowed to recreate the topic. This can happen in few cases:
+                    //  * Regex consumer getting error after topic gets deleted
+                    //  * Regular consumer after topic is manually delete and with
+                    //    auto-topic-creation set to false
+                    // No more retries are needed in this case.
+                    setState(State.Failed);
+                    closeConsumerTasks();
+                    client.cleanupConsumer(this);
+                    log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}",
+                            topic, subscription, cnx.channel().remoteAddress());
+                } else {
+                    // consumer was subscribed and connected but we got some error, keep trying
+                    reconnectLater(e.getCause());
+                }
+                return null;
+            });
     }
 
     protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) {
diff --git a/site2/docs/developing-binary-protocol.md b/site2/docs/developing-binary-protocol.md
index 33861af0da7..85562432263 100644
--- a/site2/docs/developing-binary-protocol.md
+++ b/site2/docs/developing-binary-protocol.md
@@ -279,6 +279,10 @@ subscription is not already there, a new one will be created.
 
 ![Consumer](assets/binary-protocol-consumer.png)
 
+If the client does not receive a response indicating consumer creation success or failure,
+the client should first send a command to close the original consumer before sending a
+command to re-attempt consumer creation.
+
 #### Flow control
 
 After the consumer is ready, the client needs to *give permission* to the
@@ -388,6 +392,11 @@ Parameters:
 
 This command behaves the same as [`CloseProducer`](#command-closeproducer)
 
+If the client does not receive a response to a `Subscribe` command within a timeout,
+the client must first send a `CloseConsumer` command before sending another
+`Subscribe` command. The client does not need to await a response to the `CloseConsumer`
+command before sending the next `Subscribe` command.
+
 ##### Command RedeliverUnacknowledgedMessages
 
 A consumer can ask the broker to redeliver some or all of the pending messages


[pulsar] 08/12: [fix][proxy] Fix client service url (#16834)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7b2f39cac55f95d02ed68bb9cccddd6a09764c51
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Fri Jul 29 15:09:26 2022 +0800

    [fix][proxy] Fix client service url (#16834)
    
    (cherry picked from commit eedee403da4ee531546b6440d82ed4bf14fa333a)
---
 .../pulsar/proxy/server/ProxyConnection.java       |  4 +++-
 .../pulsar/proxy/server/ProxyConnectionTest.java   | 24 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 0f41208fde2..42b4d0f08f7 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -525,8 +525,10 @@ public class ProxyConnection extends PulsarHandler {
 
     ClientConfigurationData createClientConfiguration() {
         ClientConfigurationData initialConf = new ClientConfigurationData();
-        initialConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
+        initialConf.setServiceUrl(
+                proxyConfig.isTlsEnabledWithBroker() ? service.getServiceUrlTls() : service.getServiceUrl());
+
         // Apply all arbitrary configuration. This must be called before setting any fields annotated as
         // @Secret on the ClientConfigurationData object because of the way they are serialized.
         // See https://github.com/apache/pulsar/issues/8509 for more information.
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
index 5f533e37d35..8c07e4b42d7 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -18,8 +18,12 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.testng.annotations.Test;
 
 public class ProxyConnectionTest {
@@ -35,4 +39,24 @@ public class ProxyConnectionTest {
         assertFalse(ProxyConnection
                 .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234"));
     }
+    @Test
+    public void testCreateClientConfiguration() {
+        ProxyConfiguration proxyConfiguration = new ProxyConfiguration();
+        proxyConfiguration.setTlsEnabledWithBroker(true);
+        String proxyUrlTls = "pulsar+ssl://proxy:6651";
+        String proxyUrl = "pulsar://proxy:6650";
+
+        ProxyService proxyService = mock(ProxyService.class);
+        doReturn(proxyConfiguration).when(proxyService).getConfiguration();
+        doReturn(proxyUrlTls).when(proxyService).getServiceUrlTls();
+        doReturn(proxyUrl).when(proxyService).getServiceUrl();
+
+        ProxyConnection proxyConnection = new ProxyConnection(proxyService, null);
+        ClientConfigurationData clientConfiguration = proxyConnection.createClientConfiguration();
+        assertEquals(clientConfiguration.getServiceUrl(), proxyUrlTls);
+
+        proxyConfiguration.setTlsEnabledWithBroker(false);
+        clientConfiguration = proxyConnection.createClientConfiguration();
+        assertEquals(clientConfiguration.getServiceUrl(), proxyUrl);
+    }
 }


[pulsar] 10/12: [fix][function] Fix python instance not process zip file correctly (#16697)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4f03d0056b4f4ff0e4c0e8048094117db1867fb0
Author: jiangpengcheng <sc...@gmail.com>
AuthorDate: Wed Aug 3 13:58:48 2022 +0800

    [fix][function] Fix python instance not process zip file correctly (#16697)
    
    (cherry picked from commit 68e454544d45734129e67ab3032eae3e40ff664c)
---
 .../instance/src/main/python/python_instance_main.py           | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 627013489ab..0672142e334 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -99,7 +99,7 @@ def main():
 
   if os.path.splitext(str(args.py))[1] == '.whl':
     if args.install_usercode_dependencies:
-      cmd = "pip install -t %s" % os.path.dirname(str(args.py))
+      cmd = "pip install -t %s" % os.path.dirname(os.path.abspath(str(args.py)))
       if args.dependency_repository:
         cmd = cmd + " -i %s" % str(args.dependency_repository)
       if args.extra_dependency_repository:
@@ -112,7 +112,7 @@ def main():
     else:
       zpfile = zipfile.ZipFile(str(args.py), 'r')
       zpfile.extractall(os.path.dirname(str(args.py)))
-    sys.path.insert(0, os.path.dirname(str(args.py)))
+    sys.path.insert(0, os.path.dirname(os.path.abspath(str(args.py))))
   elif os.path.splitext(str(args.py))[1] == '.zip':
     # Assumig zip file with format func.zip
     # extract to folder function
@@ -123,21 +123,21 @@ def main():
     # run pip install to target folder  deps folder
     zpfile = zipfile.ZipFile(str(args.py), 'r')
     zpfile.extractall(os.path.dirname(str(args.py)))
-    basename = os.path.splitext(str(args.py))[0]
+    basename = os.path.basename(os.path.splitext(str(args.py))[0])
 
     deps_dir = os.path.join(os.path.dirname(str(args.py)), basename, "deps")
 
     if os.path.isdir(deps_dir) and os.listdir(deps_dir):
       # get all wheel files from deps directory
       wheel_file_list = [os.path.join(deps_dir, f) for f in os.listdir(deps_dir) if os.path.isfile(os.path.join(deps_dir, f)) and os.path.splitext(f)[1] =='.whl']
-      cmd = "pip install -t %s --no-index --find-links %s %s" % (os.path.dirname(str(args.py)), deps_dir, " ".join(wheel_file_list))
+      cmd = "pip install -t %s --no-index --find-links %s %s" % (os.path.dirname(os.path.abspath(str(args.py))), deps_dir, " ".join(wheel_file_list))
       Log.debug("Install python dependencies via cmd: %s" % cmd)
       retval = os.system(cmd)
       if retval != 0:
         print("Could not install user depedencies specified by the zip file")
         sys.exit(1)
     # add python user src directory to path
-    sys.path.insert(0, os.path.join(os.path.dirname(str(args.py)), basename, "src"))
+    sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(str(args.py))), basename, "src"))
 
   log_file = os.path.join(args.logging_directory,
                           util.getFullyQualifiedFunctionName(function_details.tenant, function_details.namespace, function_details.name),


[pulsar] 02/12: [fix][authorization] Fix multiple roles authorization (#16645)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0cc42d9130d6245a3b278ff183cb72be4c1a4927
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Mon Jul 25 17:57:19 2022 +0800

    [fix][authorization] Fix multiple roles authorization (#16645)
    
    (cherry picked from commit d8483d48cb21e8e99fd56c786e5198f7fe7135f6)
---
 .../MultiRolesTokenAuthorizationProvider.java      |  86 ++++++--
 .../authorization/PulsarAuthorizationProvider.java |   3 +-
 .../MultiRolesTokenAuthorizationProviderTest.java  | 231 +++++++++++++++++++++
 3 files changed, 307 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
index b8f46a52483..d72c951c889 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java
@@ -26,9 +26,12 @@ import io.jsonwebtoken.RequiredTypeException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import javax.ws.rs.core.Response;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -38,9 +41,12 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.PolicyName;
 import org.apache.pulsar.common.policies.data.PolicyOperation;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,56 +85,112 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro
         super.initialize(conf, pulsarResources);
     }
 
-    private List<String> getRoles(AuthenticationDataSource authData) {
+    @Override
+    public CompletableFuture<Boolean> isSuperUser(String role, AuthenticationDataSource authenticationData,
+                                                  ServiceConfiguration serviceConfiguration) {
+        Set<String> roles = getRoles(authenticationData);
+        if (roles.isEmpty()) {
+            return CompletableFuture.completedFuture(false);
+        }
+        Set<String> superUserRoles = serviceConfiguration.getSuperUserRoles();
+        if (superUserRoles.isEmpty()) {
+            return CompletableFuture.completedFuture(false);
+        }
+
+        return CompletableFuture.completedFuture(roles.stream().anyMatch(superUserRoles::contains));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName, String role,
+                                                                AuthenticationDataSource authData) {
+        return isSuperUser(role, authData, conf)
+                .thenCompose(isSuperUser -> {
+                    if (isSuperUser) {
+                        return CompletableFuture.completedFuture(true);
+                    }
+                    Set<String> roles = getRoles(authData);
+                    if (roles.isEmpty()) {
+                        return CompletableFuture.completedFuture(false);
+                    }
+
+                    return pulsarResources.getTenantResources()
+                            .getTenantAsync(tenantName)
+                            .thenCompose(op -> {
+                                if (op.isPresent()) {
+                                    TenantInfo tenantInfo = op.get();
+                                    if (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()) {
+                                        return CompletableFuture.completedFuture(false);
+                                    }
+
+                                    return CompletableFuture.completedFuture(roles.stream()
+                                            .anyMatch(n -> tenantInfo.getAdminRoles().contains(n)));
+                                } else {
+                                    throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+                                }
+                            }).exceptionally(ex -> {
+                                Throwable cause = ex.getCause();
+                                if (cause instanceof MetadataStoreException.NotFoundException) {
+                                    log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
+                                    throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
+                                }
+                                log.error("Failed to get tenant {}", tenantName, cause);
+                                throw new RestException(cause);
+                            });
+                });
+    }
+
+    private Set<String> getRoles(AuthenticationDataSource authData) {
         String token = null;
 
         if (authData.hasDataFromCommand()) {
             // Authenticate Pulsar binary connection
             token = authData.getCommandData();
             if (StringUtils.isBlank(token)) {
-                return Collections.emptyList();
+                return Collections.emptySet();
             }
         } else if (authData.hasDataFromHttp()) {
             // The format here should be compliant to RFC-6750
             // (https://tools.ietf.org/html/rfc6750#section-2.1). Eg: Authorization: Bearer xxxxxxxxxxxxx
             String httpHeaderValue = authData.getHttpHeader(HTTP_HEADER_NAME);
             if (httpHeaderValue == null || !httpHeaderValue.startsWith(HTTP_HEADER_VALUE_PREFIX)) {
-                return Collections.emptyList();
+                return Collections.emptySet();
             }
 
             // Remove prefix
             token = httpHeaderValue.substring(HTTP_HEADER_VALUE_PREFIX.length());
         }
 
-        if (token == null)
-            return Collections.emptyList();
+        if (token == null) {
+            return Collections.emptySet();
+        }
 
         String[] splitToken = token.split("\\.");
         if (splitToken.length < 2) {
             log.warn("Unable to extract additional roles from JWT token");
-            return Collections.emptyList();
+            return Collections.emptySet();
         }
         String unsignedToken = splitToken[0] + "." + splitToken[1] + ".";
 
         Jwt<?, Claims> jwt = parser.parseClaimsJwt(unsignedToken);
         try {
-            return Collections.singletonList(jwt.getBody().get(roleClaim, String.class));
+            return new HashSet<>(Collections.singletonList(jwt.getBody().get(roleClaim, String.class)));
         } catch (RequiredTypeException requiredTypeException) {
             try {
                 List list = jwt.getBody().get(roleClaim, List.class);
                 if (list != null) {
-                    return list;
+                    return new HashSet<String>(list);
                 }
             } catch (RequiredTypeException requiredTypeException1) {
-                return Collections.emptyList();
+                return Collections.emptySet();
             }
         }
 
-        return Collections.emptyList();
+        return Collections.emptySet();
     }
 
-    public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String, CompletableFuture<Boolean>> authorizeFunc) {
-        List<String> roles = getRoles(authenticationData);
+    public CompletableFuture<Boolean> authorize(AuthenticationDataSource authenticationData, Function<String,
+            CompletableFuture<Boolean>> authorizeFunc) {
+        Set<String> roles = getRoles(authenticationData);
         if (roles.isEmpty()) {
             return CompletableFuture.completedFuture(false);
         }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 097464bfb5f..b753d2ed634 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -59,7 +59,8 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
     private static final Logger log = LoggerFactory.getLogger(PulsarAuthorizationProvider.class);
 
     public ServiceConfiguration conf;
-    private PulsarResources pulsarResources;
+
+    protected PulsarResources pulsarResources;
 
 
     public PulsarAuthorizationProvider() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
new file mode 100644
index 00000000000..12d7c71358b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiRolesTokenAuthorizationProviderTest.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertThrows;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.MultiRolesTokenAuthorizationProvider;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class MultiRolesTokenAuthorizationProviderTest extends MockedPulsarServiceBaseTest {
+
+    private final SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+    private final String superUserToken;
+    private final String normalUserToken;
+
+    public MultiRolesTokenAuthorizationProviderTest() {
+        Map<String, Object> claims = new HashMap<>();
+        Set<String> roles = new HashSet<>();
+        roles.add("user1");
+        roles.add("superUser");
+        claims.put("roles", roles);
+        superUserToken = Jwts.builder()
+                .setClaims(claims)
+                .signWith(secretKey)
+                .compact();
+
+        roles = new HashSet<>();
+        roles.add("normalUser");
+        roles.add("user2");
+        roles.add("user5");
+        claims.put("roles", roles);
+        normalUserToken = Jwts.builder()
+                .setClaims(claims)
+                .signWith(secretKey)
+                .compact();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("superUser");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey",
+                "data:;base64," + Base64.getEncoder().encodeToString(secretKey.getEncoded()));
+        properties.setProperty("tokenAuthClaim", "roles");
+        conf.setProperties(properties);
+
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        conf.setBrokerClientAuthenticationParameters(superUserToken);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+        conf.setAuthorizationProvider(MultiRolesTokenAuthorizationProvider.class.getName());
+
+        conf.setClusterName(configClusterName);
+        conf.setNumExecutorThreadPoolSize(5);
+    }
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+
+        admin.clusters().createCluster(configClusterName,
+                ClusterData.builder()
+                        .brokerServiceUrl(brokerUrl.toString())
+                        .serviceUrl(getPulsar().getWebServiceAddress())
+                        .build()
+        );
+    }
+
+    @BeforeClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.authentication(new AuthenticationToken(superUserToken));
+    }
+
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+        pulsarAdminBuilder.authentication(new AuthenticationToken(superUserToken));
+    }
+
+    private PulsarAdmin newPulsarAdmin(String token) throws PulsarClientException {
+        return PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .authentication(new AuthenticationToken(token))
+                .requestTimeout(3, TimeUnit.SECONDS)
+                .build();
+    }
+
+    private PulsarClient newPulsarClient(String token) throws PulsarClientException {
+        return PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .authentication(new AuthenticationToken(token))
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+    }
+
+    @Test
+    public void testAdminRequestWithSuperUserToken() throws Exception {
+        String tenant = "superuser-admin-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(superUserToken);
+        admin.tenants().createTenant(tenant, TenantInfo.builder()
+                .allowedClusters(Sets.newHashSet(configClusterName)).build());
+        String namespace = "superuser-admin-namespace";
+        admin.namespaces().createNamespace(tenant + "/" + namespace);
+        admin.brokers().getAllDynamicConfigurations();
+        admin.tenants().getTenants();
+        admin.topics().getList(tenant + "/" + namespace);
+    }
+
+    @Test
+    public void testProduceAndConsumeWithSuperUserToken() throws Exception {
+        String tenant = "superuser-client-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(superUserToken);
+        admin.tenants().createTenant(tenant, TenantInfo.builder()
+                .allowedClusters(Sets.newHashSet(configClusterName)).build());
+        String namespace = "superuser-client-namespace";
+        admin.namespaces().createNamespace(tenant + "/" + namespace);
+        String topic = tenant + "/" + namespace + "/" + "test-topic";
+
+        @Cleanup
+        PulsarClient client = newPulsarClient(superUserToken);
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer().topic(topic).create();
+        byte[] body = "hello".getBytes(StandardCharsets.UTF_8);
+        producer.send(body);
+
+        @Cleanup
+        Consumer<byte[]> consumer = client.newConsumer().topic(topic)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test")
+                .subscribe();
+        Message<byte[]> message = consumer.receive(3, TimeUnit.SECONDS);
+        assertNotNull(message);
+        assertEquals(message.getData(), body);
+    }
+
+    @Test
+    public void testAdminRequestWithNormalUserToken() throws Exception {
+        String tenant = "normaluser-admin-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(normalUserToken);
+
+        assertThrows(PulsarAdminException.NotAuthorizedException.class,
+                () -> admin.tenants().createTenant(tenant, TenantInfo.builder()
+                        .allowedClusters(Sets.newHashSet(configClusterName)).build()));
+    }
+
+    @Test
+    public void testProduceAndConsumeWithNormalUserToken() throws Exception {
+        String tenant = "normaluser-client-tenant";
+        @Cleanup
+        PulsarAdmin admin = newPulsarAdmin(superUserToken);
+        admin.tenants().createTenant(tenant, TenantInfo.builder()
+                .allowedClusters(Sets.newHashSet(configClusterName)).build());
+        String namespace = "normaluser-client-namespace";
+        admin.namespaces().createNamespace(tenant + "/" + namespace);
+        String topic = tenant + "/" + namespace + "/" + "test-topic";
+
+        @Cleanup
+        PulsarClient client = newPulsarClient(normalUserToken);
+        assertThrows(PulsarClientException.AuthorizationException.class, () -> {
+            @Cleanup
+            Producer<byte[]> ignored = client.newProducer().topic(topic).create();
+        });
+
+        assertThrows(PulsarClientException.AuthorizationException.class, () -> {
+            @Cleanup
+            Consumer<byte[]> ignored = client.newConsumer().topic(topic)
+                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscriptionName("test")
+                    .subscribe();
+        });
+    }
+}


[pulsar] 12/12: [fix][broker]remove exception log when access status.html (#17025)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e434a531a112b67fd62fa17af2eb111bf9c28125
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Wed Aug 10 13:46:12 2022 +0800

    [fix][broker]remove exception log when access status.html (#17025)
    
    (cherry picked from commit 4b757cf9f9046c7143329156b1009fe43217eaea)
---
 .../main/java/org/apache/pulsar/common/configuration/VipStatus.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
index c80f0a5471c..af1c8b34ead 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/VipStatus.java
@@ -27,10 +27,12 @@ import javax.ws.rs.Path;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response.Status;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Web resource used by the VIP service to check to availability of the service instance.
  */
+@Slf4j
 @Path("/status.html")
 public class VipStatus {
 
@@ -41,7 +43,6 @@ public class VipStatus {
     protected ServletContext servletContext;
 
     @GET
-    @Context
     public String checkStatus() {
         String statusFilePath = (String) servletContext.getAttribute(ATTRIBUTE_STATUS_FILE_PATH);
         @SuppressWarnings("unchecked")
@@ -55,6 +56,7 @@ public class VipStatus {
                 return "OK";
             }
         }
+        log.warn("Failed to access \"status.html\". The service is not ready");
         throw new WebApplicationException(Status.NOT_FOUND);
     }
 


[pulsar] 06/12: [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception (#16655)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fbfc6bdbad6e65e840e4ae41b2d3f23d1b21c873
Author: Lei Zhiyuan <le...@gmail.com>
AuthorDate: Wed Jul 27 14:57:39 2022 +0800

    [fix][client] Fix ReconsumeLater will hang up if retryLetterProducer exception (#16655)
    
    (cherry picked from commit f5826203607fca99e57be7db7559f5529089d393)
---
 .../apache/pulsar/client/api/RetryTopicTest.java   | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 11 ++--
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index 48d20c677c1..00b48afb8bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -20,11 +20,17 @@ package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.reflections.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -443,4 +449,58 @@ public class RetryTopicTest extends ProducerConsumerBase {
         checkConsumer.close();
     }
 
+
+    @Test(timeOut = 30000L)
+    public void testRetryTopicException() throws Exception {
+        final String topic = "persistent://my-property/my-ns/retry-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 1;
+        // subscribe before publish
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .enableRetry(true)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+
+        // mock a retry producer exception when reconsumelater is called
+        MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
+        List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
+        for (ConsumerImpl<byte[]> c : consumers) {
+            Set<Field> deadLetterPolicyField =
+                    ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
+
+            if (deadLetterPolicyField.size() != 0) {
+                Field field = deadLetterPolicyField.iterator().next();
+                field.setAccessible(true);
+                DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
+                deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
+            }
+        }
+        Message<byte[]> message = consumer.receive();
+        log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+        try {
+            consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
+        } catch (PulsarClientException.InvalidTopicNameException e) {
+            assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class);
+        } catch (Exception e) {
+            fail("exception should be PulsarClientException.InvalidTopicNameException");
+        }
+        consumer.close();
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 46a6264ae73..ef06a2e71aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -574,7 +574,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             .create();
                 }
             } catch (Exception e) {
-                log.error("Create retry letter producer exception with topic: {}", deadLetterPolicy.getRetryLetterTopic(), e);
+                log.error("Create retry letter producer exception with topic: {}",
+                        deadLetterPolicy.getRetryLetterTopic(), e);
+                return FutureUtil.failedFuture(e);
             } finally {
                 createProducerLock.writeLock().unlock();
             }
@@ -638,14 +640,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                             });
                 }
             } catch (Exception e) {
-                log.error("Send to retry letter topic exception with topic: {}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);
-                Set<MessageId> messageIds = Collections.singleton(messageId);
-                unAckedMessageTracker.remove(messageId);
-                redeliverUnacknowledgedMessages(messageIds);
+                result.completeExceptionally(e);
             }
         }
         MessageId finalMessageId = messageId;
         result.exceptionally(ex -> {
+            log.error("Send to retry letter topic exception with topic: {}, messageId: {}",
+                    retryLetterProducer.getTopic(), finalMessageId, ex);
             Set<MessageId> messageIds = Collections.singleton(finalMessageId);
             unAckedMessageTracker.remove(finalMessageId);
             redeliverUnacknowledgedMessages(messageIds);


[pulsar] 04/12: fix lafla source config when consumerConfigProperties='' (#16731)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4bc4659eadc1758b96291e91040c6e108f1e3757
Author: Bonan Hou <bo...@streamnative.io>
AuthorDate: Tue Jul 26 10:02:13 2022 +0800

    fix lafla source config when consumerConfigProperties='' (#16731)
    
    (cherry picked from commit 1206a37246317cdafd356dc7bd0caf9c2cc9cbc7)
---
 .../org/apache/pulsar/io/kafka/KafkaSourceConfig.java |  2 ++
 .../io/kafka/source/KafkaAbstractSourceTest.java      | 19 +++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index 3fa687eceb6..332a080cc05 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import lombok.Data;
@@ -112,6 +113,7 @@ public class KafkaSourceConfig implements Serializable {
 
     public static KafkaSourceConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
+        mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), KafkaSourceConfig.class);
     }
 }
\ No newline at end of file
diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index 1b676e2cce5..4bcf6ed8905 100644
--- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.kafka.source;
 
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -45,6 +46,7 @@ import java.util.concurrent.CompletableFuture;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.expectThrows;
 import static org.testng.Assert.fail;
 
@@ -104,6 +106,23 @@ public class KafkaAbstractSourceTest {
         source.close();
     }
 
+    @Test
+    public void loadConsumerConfigPropertiesFromMapTest() throws Exception {
+        Map<String, Object> config = new HashMap<>();
+        config.put("consumerConfigProperties", "");
+        KafkaSourceConfig kafkaSourceConfig = KafkaSourceConfig.load(config);
+        assertNotNull(kafkaSourceConfig);
+        assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+
+        config.put("consumerConfigProperties", null);
+        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        assertNull(kafkaSourceConfig.getConsumerConfigProperties());
+
+        config.put("consumerConfigProperties", ImmutableMap.of("foo", "bar"));
+        kafkaSourceConfig = KafkaSourceConfig.load(config);
+        assertEquals(kafkaSourceConfig.getConsumerConfigProperties(), ImmutableMap.of("foo", "bar"));
+    }
+
     @Test
     public final void loadFromYamlFileTest() throws IOException {
         File yamlFile = getFile("kafkaSourceConfig.yaml");


[pulsar] 07/12: [fix][client] Fix load trust certificate (#16789)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dbfbea457e8cfed1be2839c0a854c81ca00cd959
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Fri Jul 29 13:47:16 2022 +0800

    [fix][client] Fix load trust certificate (#16789)
    
    (cherry picked from commit 5d0eb9b71f911065fabb3668ad4932e2a03afe5a)
---
 .../client/api/ClientAuthenticationTlsTest.java    | 183 +++++++++++++++++++++
 .../pulsar/client/api/ProducerConsumerBase.java    |  21 ++-
 .../apache/pulsar/common/util/SecurityUtility.java |   7 +-
 3 files changed, 208 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
new file mode 100644
index 00000000000..289a7a6797d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientAuthenticationTlsTest.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-api")
+public class ClientAuthenticationTlsTest extends ProducerConsumerBase {
+    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+
+    private final Authentication authenticationTls =
+            new AuthenticationTls(TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH);
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+
+        conf.setClusterName(configClusterName);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        conf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        conf.setTlsAllowInsecureConnection(false);
+
+        conf.setBrokerClientTlsEnabled(true);
+        conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        conf.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        conf.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+    }
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        setupDefaultTenantAndNamespace();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuilder) {
+        super.customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
+        pulsarAdminBuilder.authentication(authenticationTls);
+    }
+
+    @Test
+    public void testAdminWithTrustCert() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+                .sslProvider("JDK")
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .build();
+        pulsarAdmin.clusters().getClusters();
+    }
+
+    @Test
+    public void testAdminWithFull() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+                .sslProvider("JDK")
+                .authentication(authenticationTls)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .build();
+        pulsarAdmin.clusters().getClusters();
+    }
+
+    @Test
+    public void testAdminWithCertAndKey() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+                .sslProvider("JDK")
+                .authentication(authenticationTls)
+                .build();
+        PulsarAdminException adminException =
+                expectThrows(PulsarAdminException.class, () -> pulsarAdmin.clusters().getClusters());
+        assertTrue(adminException.getMessage().contains("PKIX path"));
+    }
+
+    @Test
+    public void testAdminWithoutTls() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getPulsar().getWebServiceAddressTls())
+                .sslProvider("JDK")
+                .build();
+        PulsarAdminException adminException =
+                expectThrows(PulsarAdminException.class, () -> pulsarAdmin.clusters().getClusters());
+        assertTrue(adminException.getMessage().contains("PKIX path"));
+    }
+
+    @Test
+    public void testClientWithTrustCert() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+                .sslProvider("JDK")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .build();
+        @Cleanup
+        Producer<byte[]> ignored = pulsarClient.newProducer().topic(UUID.randomUUID().toString()).create();
+    }
+
+    @Test
+    public void testClientWithFull() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+                .sslProvider("JDK")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .authentication(authenticationTls)
+                .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .build();
+        @Cleanup
+        Producer<byte[]> ignored = pulsarClient.newProducer().topic(UUID.randomUUID().toString()).create();
+    }
+
+    @Test
+    public void testClientWithCertAndKey() throws PulsarClientException {
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+                .sslProvider("JDK")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .authentication(authenticationTls)
+                .build();
+        assertThrows(PulsarClientException.class,
+                () -> pulsarClient.newProducer().topic(UUID.randomUUID().toString()).create());
+    }
+
+    @Test
+    public void testClientWithoutTls() throws PulsarClientException, PulsarAdminException {
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(getPulsar().getBrokerServiceUrlTls())
+                .sslProvider("JDK")
+                .operationTimeout(3, TimeUnit.SECONDS)
+                .build();
+        assertThrows(PulsarClientException.class,
+                () -> pulsarClient.newProducer().topic(UUID.randomUUID().toString()).create());
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
index eae107cb8f8..1b8a16822db 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java
@@ -26,7 +26,7 @@ import java.util.Set;
 
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -63,6 +63,25 @@ public abstract class ProducerConsumerBase extends MockedPulsarServiceBaseTest {
         Assert.assertTrue(messagesReceived.add(receivedMessage), "Received duplicate message " + receivedMessage);
     }
 
+    protected void setupDefaultTenantAndNamespace() throws Exception {
+        final String tenant = "public";
+        final String namespace = tenant + "/default";
+
+        if (!admin.clusters().getClusters().contains(configClusterName)) {
+            admin.clusters().createCluster(configClusterName,
+                    ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        }
+
+        if (!admin.tenants().getTenants().contains(tenant)) {
+            admin.tenants().createTenant(tenant, TenantInfo.builder().allowedClusters(
+                    Sets.newHashSet(configClusterName)).build());
+        }
+
+        if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
+            admin.namespaces().createNamespace(namespace);
+        }
+    }
+
     private static final Random random = new Random();
 
     protected String newTopicName() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 5abad5924c4..a6d900d32bc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -250,8 +250,11 @@ public class SecurityUtility {
         if (allowInsecureConnection) {
             sslContexBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);
         } else {
-            TrustManagerProxy trustManager = new TrustManagerProxy(trustCertsFilePath, refreshDurationSec, executor);
-            sslContexBuilder.trustManager(trustManager);
+            if (StringUtils.isNotBlank(trustCertsFilePath)) {
+                TrustManagerProxy trustManager =
+                        new TrustManagerProxy(trustCertsFilePath, refreshDurationSec, executor);
+                sslContexBuilder.trustManager(trustManager);
+            }
         }
         return sslContexBuilder.build();
     }


[pulsar] 03/12: add artifactSet to pom.xml for pulsar-functions-local-runner (#16565)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6bdffab17dbe7e2a007fba7d6f0114f72a3ee4d3
Author: Bonan Hou <bo...@streamnative.io>
AuthorDate: Mon Jul 25 21:14:39 2022 +0800

    add artifactSet to pom.xml for pulsar-functions-local-runner (#16565)
    
    (cherry picked from commit 52d8fe03160648f14dc196edaa87921064a1b2b2)
---
 pulsar-functions/localrun-shaded/pom.xml | 66 +++++++++++++++++++++++++++-----
 1 file changed, 56 insertions(+), 10 deletions(-)

diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
index c6f1957f88c..9d081892cbf 100644
--- a/pulsar-functions/localrun-shaded/pom.xml
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -83,7 +83,7 @@
                             <target>
                                 <!-- shade the AsyncHttpClient ahc-default.properties files -->
                                 <replace token= "org.asynchttpclient."
-                                         value="org.apache.pulsar.shade.org.asynchttpclient."
+                                         value="org.apache.pulsar.functions.runtime.shaded.org.asynchttpclient."
                                          file="${project.build.directory}/classes/org/asynchttpclient/config/ahc-default.properties"/>
                             </target>
                         </configuration>
@@ -106,6 +106,40 @@
                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                 <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
                             </transformers>
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.pulsar:*</include>
+                                    <include>org.apache.bookkeeper:*</include>
+                                    <include>commons-*:*</include>
+                                    <include>org.apache.commons:*</include>
+                                    <include>com.fasterxml.jackson.*:*</include>
+                                    <include>io.netty:*</include>
+                                    <include>com.google.*:*</include>
+                                    <include>javax.servlet:*</include>
+                                    <include>org.reactivestreams:reactive-streams</include>
+                                    <include>org.apache.commons:*</include>
+                                    <include>io.swagger:*</include>
+                                    <include>org.yaml:snakeyaml</include>
+                                    <include>io.perfmark:*</include>
+                                    <include>io.prometheus:*</include>
+                                    <include>io.prometheus.jmx:*</include>
+                                    <include>javax.ws.rs:*</include>
+                                    <include>org.tukaani:xz</include>
+                                    <include>com.github.zafarkhaja:java-semver</include>
+                                    <include>net.java.dev.jna:*</include>
+                                    <include>org.apache.zookeeper:*</include>
+                                    <include>com.thoughtworks.paranamer:paranamer</include>
+                                    <include>jline:*</include>
+                                    <include>org.rocksdb:*</include>
+                                    <include>org.eclipse.jetty*:*</include>
+                                    <include>org.apache.avro:avro</include>
+                                    <include>com.beust:*</include>
+                                    <include>net.jodah:*</include>
+                                    <include>io.airlift:*</include>
+                                    <include>com.yahoo.datasketches:*</include>
+                                    <include>io.netty.resolver:*</include>
+                                </includes>
+                            </artifactSet>
                             <filters>
                                 <filter>
                                     <artifact>org.apache.pulsar:pulsar-client-original</artifact>
@@ -225,10 +259,10 @@
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
                                 </relocation>
                                 <!-- Jackson cannot be shaded, this is causing java.lang.NoSuchMethodError when calling getThreadLocalYaml-->
-                                <!-- <relocation>
+                                <relocation>
                                     <pattern>com.fasterxml</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
-                                </relocation> -->
+                                </relocation>
                                 <relocation>
                                     <pattern>org.inferred</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
@@ -246,10 +280,10 @@
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
                                 </relocation>
                                 <!-- This refers to an older version of Jackson -->
-                                <!-- <relocation>
+                                <relocation>
                                     <pattern>org.codehaus.jackson</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
-                                </relocation> -->
+                                </relocation>
                                 <relocation>
                                     <pattern>net.java.dev.jna</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
@@ -270,6 +304,10 @@
                                     <pattern>io.prometheus</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.prometheus</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>io.prometheus.jmx</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.prometheus.jmx</shadedPattern>
+                                </relocation>
                                 <relocation>
                                     <pattern>org.apache.zookeeper</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper</shadedPattern>
@@ -347,18 +385,22 @@
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.avo.shaded</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    <pattern>com.yahoo</pattern>
-                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.yahoo</shadedPattern>
+                                    <pattern>com.yahoo.datasketches</pattern>
+                                    <shadedPattern>org.apache.pulsar.shaded.com.yahoo.datasketches</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.yahoo.sketches</pattern>
+                                    <shadedPattern>org.apache.pulsar.shaded.com.yahoo.sketches</shadedPattern>
                                 </relocation>
                                 <relocation>
                                     <pattern>com.beust</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
                                 </relocation>
                                 <!-- Netty cannot be shaded, this is causing java.lang.NoSuchMethodError -->
-                                <!-- <relocation>
+                                <relocation>
                                     <pattern>io.netty</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
-                                </relocation> -->
+                                </relocation>
                                 <relocation>
                                     <pattern>org.hamcrest</pattern>
                                     <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
@@ -382,7 +424,11 @@
                                 -->
                                 <relocation>
                                     <pattern>org.asynchttpclient</pattern>
-                                    <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.asynchttpclient</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.airlift</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.airlift</shadedPattern>
                                 </relocation>
                                 <!-- DONT ever shade log4j, otherwise logging won't work anymore in running functions in process mode
                                 <relocation>


[pulsar] 01/12: Exclude the Netty Reactive Stream from asynchttpclient (#16312)

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ba24b40a1f9e9eb3c394e7180149532161829198
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Jul 5 14:48:34 2022 +0800

    Exclude the Netty Reactive Stream from asynchttpclient (#16312)
    
    * Exclude the Netty Reactive Stream from asynchttpclient
    ---
    
    *Motivation*
    
    We upgrade the Netty Reactive Stream in the PR #15990,
    but the asynchttpclient still uses it. We should use
    our project dependency to address the CVE.
    
    * Add the related dependency to the sub module
    
    (cherry picked from commit f9e89edee9ccb88c3656443b1cf6ffbb0aa1ac55)
---
 pom.xml                     | 4 ++++
 pulsar-client-tools/pom.xml | 4 ++++
 pulsar-client/pom.xml       | 5 +++++
 3 files changed, 13 insertions(+)

diff --git a/pom.xml b/pom.xml
index 64be13ca11d..b17952cfc3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -272,6 +272,10 @@ flexible messaging model and an intuitive client API.</description>
             <groupId>io.netty</groupId>
             <artifactId>*</artifactId>
           </exclusion>
+          <exclusion>
+            <groupId>com.typesafe.netty</groupId>
+            <artifactId>netty-reactive-streams</artifactId>
+          </exclusion>
         </exclusions>
       </dependency>
 
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index c20458d5129..d74731449db 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -72,6 +72,10 @@
       <groupId>org.asynchttpclient</groupId>
       <artifactId>async-http-client</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.typesafe.netty</groupId>
+      <artifactId>netty-reactive-streams</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 233b4f9e5b4..ebac828acb1 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -94,6 +94,11 @@
       <artifactId>async-http-client</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.typesafe.netty</groupId>
+      <artifactId>netty-reactive-streams</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>