You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/04/14 02:26:24 UTC

[pulsar] branch branch-2.7 updated (003f3ed -> a0c9b6a)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 003f3ed  Pass envirnoment variables to the docker container when building whee… (#10043)
     new 05e1ccd  [PIP-60] Add TLS SNI support for cpp and python clients (#8957)
     new 5a2f9ea  [Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878)
     new 8ac9cfb  [Messaging] Fix delay message block (#10078)
     new 43c529f  Avoid spammy logs in case of BK problems (#10088)
     new bb41022  Fix AutoConsumeSchema KeyValue encoding (#10089)
     new 0b3e510  Fix: seemingly equal ClientConfigurationData's objects end up not being equal (#10091)
     new f3b5ff7  Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)
     new bc4cb4c  Fix MessagePublishBufferThrottleTest flaky-test (#8733)
     new 38fae22  [flaky test] Fix unit tests that occasionally fail (#9226)
     new a0c9b6a  Upgrade Jetty libraries to 9.4.39.v20210325 (#10177)

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/src/assemble/LICENSE.bin.txt   |  34 +++---
 .../bookkeeper/mledger/offload/OffloaderUtils.java |   4 +-
 .../mledger/offload/OffloadersCache.java           |  68 +++++++++++
 .../mledger/offload/OffloadersCacheTest.java       |  62 ++++++++++
 pom.xml                                            |   2 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  13 ++-
 .../delayed/InMemoryDelayedDeliveryTracker.java    |   9 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |  15 ++-
 .../broker/admin/IncrementPartitionsTest.java      |  13 ++-
 .../pulsar/broker/admin/TopicMessageTTLTest.java   |   1 +
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |   1 +
 .../broker/service/ConsumedLedgersTrimTest.java    |   7 +-
 .../service/MessagePublishBufferThrottleTest.java  |   8 +-
 .../broker/service/ReplicatorRateLimiterTest.java  |   3 +
 .../pulsar/broker/service/ReplicatorTestBase.java  | 129 +++++++++++++--------
 .../service/persistent/TopicDuplicationTest.java   |  28 ++---
 pulsar-client-cpp/lib/ClientConnection.cc          |  14 ++-
 .../client/impl/conf/ClientConfigurationData.java  |   2 +-
 .../client/impl/schema/AutoConsumeSchema.java      |   3 +-
 .../impl/conf/ConfigurationDataUtilsTest.java      |  12 ++
 .../client/impl/schema/KeyValueSchemaTest.java     |   9 ++
 .../apache/pulsar/functions/utils/Exceptions.java  |  12 ++
 .../pulsar/functions/utils/ExceptionsTest.java     |  27 +++++
 pulsar-sql/presto-distribution/LICENSE             |  14 +--
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  11 +-
 .../integration/messaging/DelayMessagingTest.java  | 105 +++++++++++++++++
 .../src/test/resources/pulsar-messaging.xml        |   4 +-
 28 files changed, 492 insertions(+), 122 deletions(-)
 create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
 create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
 create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java

[pulsar] 02/10: [Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5a2f9ea0384f5e2e4b579443d157336853b450f9
Author: Michael Marshall <47...@users.noreply.github.com>
AuthorDate: Thu Apr 1 13:07:32 2021 -0600

    [Tiered Storage] Prevent Class Loader Leak; Restore Offloader Directory Override (#9878)
    
    In Pulsar 2.7.0, there is a class loader leak. It looks like https://github.com/apache/pulsar/pull/8739 fixed the leak by only loading the offloader classes for the directory configured in `broker.conf`. However, the solution in https://github.com/apache/pulsar/pull/8739 ignores the fact that an offload policy can override the the offloaded directory. As such, there could be a regression in 2.7.1 if users are providing multiple offload directories.
    
    This PR returns the functionality without reintroducing the class loader leak.
    
    Update the `PulsarService` and the `PulsarConnectorCache` classes to use a map from directory strings to `Offloaders`.
    
    The new `Map` has keys of type `String`, but we could use keys of type `Path` and then normalize the paths to ensure that `./offloaders` and `offloaders` result in a single class loader. However, it looks like the `normalize` method in the path class has a warning about symbolic links. As such, I went with the basic `String` approach, which might lead to some duplication of loaded classes. Below is the javadoc for `normalize`, in case that helps for a design decision.
    
    ```java
      /**
         * Returns a path that is this path with redundant name elements eliminated.
         *
         * <p> The precise definition of this method is implementation dependent but
         * in general it derives from this path, a path that does not contain
         * <em>redundant</em> name elements. In many file systems, the "{@code .}"
         * and "{@code ..}" are special names used to indicate the current directory
         * and parent directory. In such file systems all occurrences of "{@code .}"
         * are considered redundant. If a "{@code ..}" is preceded by a
         * non-"{@code ..}" name then both names are considered redundant (the
         * process to identify such names is repeated until it is no longer
         * applicable).
         *
         * <p> This method does not access the file system; the path may not locate
         * a file that exists. Eliminating "{@code ..}" and a preceding name from a
         * path may result in the path that locates a different file than the original
         * path. This can arise when the preceding name is a symbolic link.
         *
         * @return  the resulting path or this path if it does not contain
         *          redundant name elements; an empty path is returned if this path
         *          does have a root component and all name elements are redundant
         *
         * @see #getParent
         * @see #toRealPath
         */
        Path normalize();
    ```
    
    This change is a code cleanup without any test coverage that should be covered by other tests. If required, I can create some tests.
    
    (cherry picked from commit 6c3ebbb01415cdfe094650ae0eeeea6dcc224e87)
---
 .../bookkeeper/mledger/offload/OffloaderUtils.java |  4 +-
 .../mledger/offload/OffloadersCache.java           | 68 ++++++++++++++++++++++
 .../mledger/offload/OffloadersCacheTest.java       | 62 ++++++++++++++++++++
 .../org/apache/pulsar/broker/PulsarService.java    | 13 +++--
 .../pulsar/sql/presto/PulsarConnectorCache.java    | 11 ++--
 5 files changed, 144 insertions(+), 14 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
index 5243691..bc747d7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java
@@ -114,8 +114,8 @@ public class OffloaderUtils {
         }
     }
 
-    public static Offloaders searchForOffloaders(String connectorsDirectory, String narExtractionDirectory) throws IOException {
-        Path path = Paths.get(connectorsDirectory).toAbsolutePath();
+    public static Offloaders searchForOffloaders(String offloadersPath, String narExtractionDirectory) throws IOException {
+        Path path = Paths.get(offloadersPath).toAbsolutePath();
         log.info("Searching for offloaders in {}", path);
 
         Offloaders offloaders = new Offloaders();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
new file mode 100644
index 0000000..e80c75b
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloadersCache.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of an Offloaders. The main purpose of this class is to
+ * ensure that an Offloaders directory is only loaded once.
+ */
+@Slf4j
+public class OffloadersCache implements AutoCloseable {
+
+    private Map<String, Offloaders> loadedOffloaders = new ConcurrentHashMap<>();
+
+    /**
+     * Method to load an Offloaders directory or to get an already loaded Offloaders directory.
+     *
+     * @param offloadersPath - the directory to search the offloaders nar files
+     * @param narExtractionDirectory - the directory to use for extraction
+     * @return the loaded offloaders class
+     * @throws IOException when fail to retrieve the pulsar offloader class
+     */
+    public Offloaders getOrLoadOffloaders(String offloadersPath, String narExtractionDirectory) {
+        return loadedOffloaders.computeIfAbsent(offloadersPath,
+                (directory) -> {
+                    try {
+                        return OffloaderUtils.searchForOffloaders(directory, narExtractionDirectory);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    @Override
+    public void close() {
+        loadedOffloaders.values().forEach(offloaders -> {
+            try {
+                offloaders.close();
+            } catch (Exception e) {
+                log.error("Error while closing offloader.", e);
+                // Even if the offloader fails to close, the graceful shutdown process continues
+            }
+        });
+        // Don't want to hold on to references to closed offloaders
+        loadedOffloaders.clear();
+    }
+}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
new file mode 100644
index 0000000..1c2cd85
--- /dev/null
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/offload/OffloadersCacheTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.offload;
+
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.eq;
+import static org.testng.Assert.assertSame;
+
+@PrepareForTest({OffloaderUtils.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.pulsar.common.nar.*"})
+public class OffloadersCacheTest {
+
+    // Necessary to make PowerMockito.mockStatic work with TestNG.
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testLoadsOnlyOnce() throws Exception {
+        Offloaders expectedOffloaders = new Offloaders();
+
+        PowerMockito.mockStatic(OffloaderUtils.class);
+        PowerMockito.when(OffloaderUtils.searchForOffloaders(eq("./offloaders"), eq("/tmp")))
+                .thenReturn(expectedOffloaders);
+
+        OffloadersCache cache = new OffloadersCache();
+
+        // Call a first time to load the offloader
+        Offloaders offloaders1 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+        assertSame(offloaders1, expectedOffloaders, "The offloaders should be the mocked one.");
+
+        // Call a second time to get the stored offlaoder
+        Offloaders offloaders2 = cache.getOrLoadOffloaders("./offloaders", "/tmp");
+
+        assertSame(offloaders2, expectedOffloaders, "The offloaders should be the mocked one.");
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index fcabff2..ef8df6f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -66,9 +66,9 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
@@ -183,7 +183,7 @@ public class PulsarService implements AutoCloseable {
     private final ScheduledExecutorService loadManagerExecutor;
     private ScheduledExecutorService compactorExecutor;
     private OrderedScheduler offloaderScheduler;
-    private Offloaders offloaderManager = new Offloaders();
+    private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
     private ScheduledFuture<?> loadReportTask = null;
@@ -370,7 +370,7 @@ public class PulsarService implements AutoCloseable {
                 schemaRegistryService.close();
             }
 
-            offloaderManager.close();
+            offloadersCache.close();
 
             if (protocolHandlers != null) {
                 protocolHandlers.close();
@@ -482,8 +482,6 @@ public class PulsarService implements AutoCloseable {
             schemaRegistryService = SchemaRegistryService.create(
                     schemaStorage, config.getSchemaRegistryCompatibilityCheckers());
 
-            this.offloaderManager = OffloaderUtils.searchForOffloaders(
-                    config.getOffloadersDirectory(), config.getNarExtractionDirectory());
             this.defaultOffloader = createManagedLedgerOffloader(
                     OffloadPolicies.create(this.getConfiguration().getProperties()));
             this.brokerInterceptor = BrokerInterceptors.load(config);
@@ -932,7 +930,10 @@ public class PulsarService implements AutoCloseable {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                     "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
+                        offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());
+
+                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
                 try {
                     return offloaderFactory.create(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index c10312a..757995a 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -35,8 +35,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
-import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
 import org.apache.bookkeeper.mledger.offload.Offloaders;
+import org.apache.bookkeeper.mledger.offload.OffloadersCache;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PulsarVersion;
@@ -57,7 +57,7 @@ public class PulsarConnectorCache {
 
     private final StatsProvider statsProvider;
     private OrderedScheduler offloaderScheduler;
-    private Offloaders offloaderManager;
+    private OffloadersCache offloadersCache = new OffloadersCache();
     private LedgerOffloader defaultOffloader;
     private Map<NamespaceName, LedgerOffloader> offloaderMap = new ConcurrentHashMap<>();
 
@@ -155,9 +155,9 @@ public class PulsarConnectorCache {
                 checkNotNull(offloadPolicies.getOffloadersDirectory(),
                         "Offloader driver is configured to be '%s' but no offloaders directory is configured.",
                         offloadPolicies.getManagedLedgerOffloadDriver());
-                this.offloaderManager = OffloaderUtils.searchForOffloaders(offloadPolicies.getOffloadersDirectory(),
+                Offloaders offloaders = offloadersCache.getOrLoadOffloaders(offloadPolicies.getOffloadersDirectory(),
                         pulsarConnectorConfig.getNarExtractionDirectory());
-                LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory(
+                LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
                         offloadPolicies.getManagedLedgerOffloadDriver());
 
                 try {
@@ -195,8 +195,7 @@ public class PulsarConnectorCache {
                 instance.statsProvider.stop();
                 instance.managedLedgerFactory.shutdown();
                 instance.offloaderScheduler.shutdown();
-                instance.offloaderManager.close();
-                instance = null;
+                instance.offloadersCache.close();
             }
         }
     }

[pulsar] 06/10: Fix: seemingly equal ClientConfigurationData's objects end up not being equal (#10091)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b3e51029a2e1d51241aee91bed45b14a7c3425b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Mon Apr 5 10:12:53 2021 -0700

    Fix: seemingly equal ClientConfigurationData's objects end up not being equal (#10091)
    
    * fixed equals for the ClientConfigurationData (found while fixing tests in pulsar-adapters repo)
    
    * added test/repro
    
    * CR feedback
    
    (cherry picked from commit 8a2c5cc2ba170ddce50712e48c053c187da98d60)
---
 .../pulsar/client/impl/conf/ClientConfigurationData.java     |  2 +-
 .../pulsar/client/impl/conf/ConfigurationDataUtilsTest.java  | 12 ++++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index b68caf1..ca81669 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -103,7 +103,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
 
     public Authentication getAuthentication() {
         if (authentication == null) {
-            this.authentication = new AuthenticationDisabled();
+            this.authentication = AuthenticationDisabled.INSTANCE;
         }
         return authentication;
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index a1bafbd..ba94726 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -138,4 +138,16 @@ public class ConfigurationDataUtilsTest {
         assertEquals(pulsarClient.getConfiguration().getStatsIntervalSeconds(), 80,
                 "builder default should overrite if set explicitly");
     }
+
+    @Test
+    public void testEquals() {
+        ClientConfigurationData confData1 = new ClientConfigurationData();
+        confData1.setServiceUrl("pulsar://unknown:6650");
+
+        ClientConfigurationData confData2 = new ClientConfigurationData();
+        confData2.setServiceUrl("pulsar://unknown:6650");
+
+        assertEquals(confData1, confData2);
+        assertEquals(confData1.hashCode(), confData2.hashCode());
+    }
 }

[pulsar] 10/10: Upgrade Jetty libraries to 9.4.39.v20210325 (#10177)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a0c9b6abe57f925f50b6be6e4ddf20fc26b1e6fc
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Sat Apr 10 10:31:24 2021 +0900

    Upgrade Jetty libraries to 9.4.39.v20210325 (#10177)
    
    (cherry picked from commit 56bad04d702311cffed2ba94ff35f170af185eda)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 34 ++++++++++++------------
 pom.xml                                          |  2 +-
 pulsar-sql/presto-distribution/LICENSE           | 14 +++++-----
 3 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 776f3b4..fff2916 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -430,23 +430,23 @@ The Apache Software License, Version 2.0
     - org.asynchttpclient-async-http-client-2.12.1.jar
     - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
  * Jetty
-    - org.eclipse.jetty-jetty-client-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-continuation-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-http-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-io-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-proxy-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-security-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-server-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-servlet-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-servlets-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-util-9.4.35.v20201120.jar
-    - org.eclipse.jetty-jetty-util-ajax-9.4.35.v20201120.jar
-    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.35.v20201120.jar
-    - org.eclipse.jetty.websocket-websocket-api-9.4.35.v20201120.jar
-    - org.eclipse.jetty.websocket-websocket-client-9.4.35.v20201120.jar
-    - org.eclipse.jetty.websocket-websocket-common-9.4.35.v20201120.jar
-    - org.eclipse.jetty.websocket-websocket-server-9.4.35.v20201120.jar
-    - org.eclipse.jetty.websocket-websocket-servlet-9.4.35.v20201120.jar
+    - org.eclipse.jetty-jetty-client-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-continuation-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-http-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-io-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-proxy-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-security-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-server-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-servlet-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-servlets-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-util-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-util-ajax-9.4.39.v20210325.jar
+    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.39.v20210325.jar
+    - org.eclipse.jetty.websocket-websocket-api-9.4.39.v20210325.jar
+    - org.eclipse.jetty.websocket-websocket-client-9.4.39.v20210325.jar
+    - org.eclipse.jetty.websocket-websocket-common-9.4.39.v20210325.jar
+    - org.eclipse.jetty.websocket-websocket-server-9.4.39.v20210325.jar
+    - org.eclipse.jetty.websocket-websocket-servlet-9.4.39.v20210325.jar
  * SnakeYaml -- org.yaml-snakeyaml-1.26.jar
  * RocksDB - org.rocksdb-rocksdbjni-6.10.2.jar
  * Google Error Prone Annotations - com.google.errorprone-error_prone_annotations-2.3.4.jar
diff --git a/pom.xml b/pom.xml
index 356322b..0e7d0e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,7 @@ flexible messaging model and an intuitive client API.</description>
     <zookeeper.version>3.5.7</zookeeper.version>
     <netty.version>4.1.60.Final</netty.version>
     <netty-tc-native.version>2.0.36.Final</netty-tc-native.version>
-    <jetty.version>9.4.35.v20201120</jetty.version>
+    <jetty.version>9.4.39.v20210325</jetty.version>
     <jersey.version>2.31</jersey.version>
     <athenz.version>1.8.38</athenz.version>
     <prometheus.version>0.5.0</prometheus.version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 8f4eef1..99217c75 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -439,13 +439,13 @@ The Apache Software License, Version 2.0
   * Java Assist
     - javassist-3.25.0-GA.jar
   * Jetty
-    - jetty-http-9.4.35.v20201120.jar
-    - jetty-io-9.4.35.v20201120.jar
-    - jetty-security-9.4.35.v20201120.jar
-    - jetty-server-9.4.35.v20201120.jar
-    - jetty-servlet-9.4.35.v20201120.jar
-    - jetty-util-9.4.35.v20201120.jar
-    - jetty-util-ajax-9.4.35.v20201120.jar
+    - jetty-http-9.4.39.v20210325.jar
+    - jetty-io-9.4.39.v20210325.jar
+    - jetty-security-9.4.39.v20210325.jar
+    - jetty-server-9.4.39.v20210325.jar
+    - jetty-servlet-9.4.39.v20210325.jar
+    - jetty-util-9.4.39.v20210325.jar
+    - jetty-util-ajax-9.4.39.v20210325.jar
   * Java Native Access
     - jna-4.2.0.jar
     - jna-5.3.1.jar

[pulsar] 03/10: [Messaging] Fix delay message block (#10078)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8ac9cfb8755bfd173ceaa4246bf5487f2c2c10e1
Author: ran <ga...@126.com>
AuthorDate: Fri Apr 9 17:37:22 2021 +0800

    [Messaging] Fix delay message block (#10078)
    
    Currently, in the docker environment, if the consumer enables the retry feature and sets the retry topic in DeadLetterPolicy, the consumer will be blocked after receive retry messages many times.
    
    The delay TimerTask may run before reaching the timeout, we could find out that the last log `Timer triggered` run after the log `Start timer in 4958 millis` but we cloud compute the duration between these two logs is `4936`, it's less than `4958`, so the hasMessageAvailable check is false, if there are no more messages to read the delay messages reading will be blocked. Please check logs below.
    
    (cherry picked from commit 31f831574c3a65bae0e76801facaf2deb0b17fbb)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |   9 +-
 .../integration/messaging/DelayMessagingTest.java  | 105 +++++++++++++++++++++
 .../src/test/resources/pulsar-messaging.xml        |   4 +-
 3 files changed, 116 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 5c37b81..bfb5b70 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -90,7 +90,14 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
      */
     @Override
     public boolean hasMessageAvailable() {
-        return !priorityQueue.isEmpty() && priorityQueue.peekN1() <= clock.millis();
+        // Avoid the TimerTask run before reach the timeout.
+        long cutOffTime = clock.millis() + tickTimeMillis;
+        boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= cutOffTime;
+        if (!hasMessageAvailable) {
+            // prevent the first delay message later than cutoffTime
+            updateTimer();
+        }
+        return hasMessageAvailable;
     }
 
     /**
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java
new file mode 100644
index 0000000..8b06d1d
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/DelayMessagingTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.messaging;
+
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Delay messaging test.
+ */
+@Slf4j
+public class DelayMessagingTest extends PulsarTestSuite {
+
+    @Test(dataProvider = "ServiceUrls")
+    public void delayMsgBlockTest(String serviceUrl) throws Exception {
+        String nsName = generateNamespaceName();
+        pulsarCluster.createNamespace(nsName);
+
+        String topic = generateTopicName(nsName, "testDelayMsgBlock", true);
+        pulsarCluster.createPartitionedTopic(topic, 3);
+
+        String retryTopic = topic + "-RETRY";
+        String deadLetterTopic = topic + "-DLT";
+
+        @Cleanup
+        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        final int redeliverCnt = 10;
+        final int delayTimeSeconds = 5;
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("test")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .enableRetry(true)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(redeliverCnt)
+                        .retryLetterTopic(retryTopic)
+                        .deadLetterTopic(deadLetterTopic)
+                        .build())
+                .receiverQueueSize(100)
+                .ackTimeout(60, TimeUnit.SECONDS)
+                .subscribe();
+
+        producer.newMessage().value("hello".getBytes()).send();
+
+        // receive message at first time
+        Message<byte[]> message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
+        Assert.assertNotNull(message, "Can't receive message at the first time.");
+        consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
+
+        // receive retry messages
+        for (int i = 0; i < redeliverCnt; i++) {
+            message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
+            Assert.assertNotNull(message, "Consumer can't receive message in double delayTimeSeconds time "
+                    + delayTimeSeconds * 2 + "s");
+            log.info("receive msg. reConsumeTimes: {}", message.getProperty("RECONSUMETIMES"));
+            consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
+        }
+
+        @Cleanup
+        Consumer<byte[]> dltConsumer = pulsarClient.newConsumer()
+                .topic(deadLetterTopic)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName("test")
+                .subscribe();
+
+        message = dltConsumer.receive(10, TimeUnit.SECONDS);
+        Assert.assertNotNull(message, "Dead letter topic consumer can't receive message.");
+    }
+
+}
diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml
index 4af0a33..9045413 100644
--- a/tests/integration/src/test/resources/pulsar-messaging.xml
+++ b/tests/integration/src/test/resources/pulsar-messaging.xml
@@ -24,6 +24,8 @@
         <classes>
             <class name="org.apache.pulsar.tests.integration.messaging.PersistentTopicMessagingTest" />
             <class name="org.apache.pulsar.tests.integration.messaging.NonPersistentTopicMessagingTest" />
+            <class name="org.apache.pulsar.tests.integration.messaging.DelayMessagingTest" />
+            <class name="org.apache.pulsar.tests.integration.io.AvroKafkaSourceTest" />
         </classes>
     </test>
-</suite>
\ No newline at end of file
+</suite>

[pulsar] 05/10: Fix AutoConsumeSchema KeyValue encoding (#10089)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bb410225394a3b0a9a1e87ece8d9845ce8b3a66e
Author: Vincent Royer <vr...@strapdata.com>
AuthorDate: Thu Apr 1 20:38:27 2021 +0200

    Fix AutoConsumeSchema KeyValue encoding (#10089)
    
    ### Motivation
    
    Keep the KeyValueEncodingType when auto-consuming a KeyValue schema.
    
    ### Modifications
    
    see the single commit.
    
    ### Verifying this change
    
    Add a unit test org.apache.pulsar.client.impl.schema.KeyValueSchemaTest.testKeyValueSchemaSeparatedEncoding
    checking that the encoding type is preserved.
    
    (cherry picked from commit 6717974eda5192666c7769efc87c80ecff381ce1)
---
 .../org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java  | 3 ++-
 .../org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java | 9 +++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 442ede1..405b9cc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -209,7 +209,8 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
                     KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
                 Schema<?> keySchema = getSchema(kvSchemaInfo.getKey());
                 Schema<?> valueSchema = getSchema(kvSchemaInfo.getValue());
-                return KeyValueSchema.of(keySchema, valueSchema);
+                return KeyValueSchema.of(keySchema, valueSchema,
+                        KeyValueSchemaInfo.decodeKeyValueEncodingType(schemaInfo));
             default:
                 throw new IllegalArgumentException("Retrieve schema instance from schema info for type '"
                     + schemaInfo.getType() + "' is not supported yet");
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 9c903fa..4692290 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -388,4 +388,13 @@ public class KeyValueSchemaTest {
         assertEquals(foo, fooBack);
         assertEquals(bar, barBack);
     }
+
+    @Test
+    public void testKeyValueSchemaSeparatedEncoding() {
+        KeyValueSchema<String, String> keyValueSchema = (KeyValueSchema<String,String>)
+                KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED);
+        KeyValueSchema<String, String> keyValueSchema2 = (KeyValueSchema<String,String>)
+                AutoConsumeSchema.getSchema(keyValueSchema.getSchemaInfo());
+        assertEquals(keyValueSchema.getKeyValueEncodingType(), keyValueSchema2.getKeyValueEncodingType());
+    }
 }

[pulsar] 08/10: Fix MessagePublishBufferThrottleTest flaky-test (#8733)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bc4cb4ceae33bb80e5d5394aa5d671401e056992
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat Nov 28 00:40:45 2020 +0800

    Fix MessagePublishBufferThrottleTest flaky-test (#8733)
    
    (cherry picked from commit 34b1d5afee0609bbfa032f0ea9a479951b1fcc47)
---
 .../pulsar/broker/service/MessagePublishBufferThrottleTest.java    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 91ce32b..6dd4515 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -22,6 +22,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -38,9 +39,10 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
         //No-op
     }
 
+    @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
-        //No-op
+        super.internalCleanup();
     }
 
     @Test
@@ -70,7 +72,6 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
         }
         Thread.sleep(20);
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        super.internalCleanup();
     }
 
     @Test
@@ -107,7 +108,6 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
             Assert.assertNotNull(future.get());
         }
         Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
-        super.internalCleanup();
     }
 
     @Test
@@ -153,6 +153,5 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
             Assert.assertNotNull(future.get());
         }
         Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
-        super.internalCleanup();
     }
 }

[pulsar] 04/10: Avoid spammy logs in case of BK problems (#10088)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 43c529f9465c71bebb2f17dbcd42326399a51b77
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Apr 1 14:46:19 2021 +0200

    Avoid spammy logs in case of BK problems (#10088)
    
    (cherry picked from commit cdba1c03efff2b908850b54456a1843649fb98fb)
---
 .../apache/pulsar/broker/service/ServerCnx.java    |  4 +++-
 .../apache/pulsar/functions/utils/Exceptions.java  | 12 ++++++++++
 .../pulsar/functions/utils/ExceptionsTest.java     | 27 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index d8f09b9..dd8ad4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -131,6 +131,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SafeCollectionUtils;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
+import org.apache.pulsar.functions.utils.Exceptions;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.slf4j.Logger;
@@ -1179,7 +1180,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                 cause = new TopicNotFoundException("Topic Not Found.");
                             }
 
-                            if (!(cause instanceof ServiceUnitNotReadyException)) {
+                            if (!Exceptions.areExceptionsPresentInChain(cause,
+                                    ServiceUnitNotReadyException.class, ManagedLedgerException.class)) {
                                 // Do not print stack traces for expected exceptions
                                 log.error("[{}] Failed to create topic {}, producerId={}",
                                           remoteAddress, topicName, producerId, exception);
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Exceptions.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Exceptions.java
index 19b31c6..c1e804c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Exceptions.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Exceptions.java
@@ -48,4 +48,16 @@ public class Exceptions {
         return sw.toString();
     }
 
+    public static boolean areExceptionsPresentInChain(Throwable error, Class ... types) {
+        while (error != null) {
+            for (Class type : types) {
+                if (type.isInstance(error)) {
+                    return true;
+                }
+            }
+            error = error.getCause();
+        }
+        return false;
+    }
+
 }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ExceptionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ExceptionsTest.java
index 67e1406..f08c143 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ExceptionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ExceptionsTest.java
@@ -21,7 +21,9 @@ package org.apache.pulsar.functions.utils;
 
 import static org.apache.pulsar.functions.utils.Exceptions.rethrowIOException;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.io.IOException;
@@ -77,4 +79,29 @@ public class ExceptionsTest {
         }
     }
 
+    @Test
+    public void testAreExceptionsPresentInChain() {
+        assertFalse(Exceptions.areExceptionsPresentInChain(null, IllegalStateException.class));
+    }
+
+    @Test
+    public void testAreExceptionsPresentInChain2() {
+        assertTrue(Exceptions.areExceptionsPresentInChain(new IllegalStateException(), IllegalStateException.class));
+    }
+
+    @Test
+    public void testAreExceptionsPresentInChain3() {
+        assertTrue(Exceptions.areExceptionsPresentInChain(new IllegalArgumentException(new IllegalStateException()), IllegalStateException.class));
+    }
+
+    @Test
+    public void testAreExceptionsPresentInChain4() {
+        assertTrue(Exceptions.areExceptionsPresentInChain(new IllegalArgumentException(new IllegalStateException()), UnsupportedOperationException.class, IllegalStateException.class));
+    }
+
+    @Test
+    public void testAreExceptionsPresentInChain5() {
+        assertFalse(Exceptions.areExceptionsPresentInChain(new IllegalArgumentException(new IllegalArgumentException()), IllegalStateException.class));
+    }
+
 }

[pulsar] 01/10: [PIP-60] Add TLS SNI support for cpp and python clients (#8957)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 05e1ccd4dc3c76b62861ec44d657273a7e5e4319
Author: Deon van der Vyver <de...@gmail.com>
AuthorDate: Thu Dec 24 08:12:45 2020 +0100

    [PIP-60] Add TLS SNI support for cpp and python clients (#8957)
    
    * Add TLS SNI support for cpp and python clients
    
    (cherry picked from commit f018892825870e7852c7c1c1377177b5c38e9044)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index d17a9b6..3442c89 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -183,6 +183,8 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
 #else
         boost::asio::ssl::context ctx(executor_->io_service_, boost::asio::ssl::context::tlsv1_client);
 #endif
+        Url serviceUrl;
+        Url::parse(physicalAddress, serviceUrl);
         if (clientConfiguration.isTlsAllowInsecureConnection()) {
             ctx.set_verify_mode(boost::asio::ssl::context::verify_none);
             isTlsAllowInsecureConnection_ = true;
@@ -190,9 +192,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
             ctx.set_verify_mode(boost::asio::ssl::context::verify_peer);
 
             if (clientConfiguration.isValidateHostName()) {
-                Url service_url;
-                Url::parse(physicalAddress, service_url);
-                LOG_DEBUG("Validating hostname for " << service_url.host() << ":" << service_url.port());
+                LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
                 ctx.set_verify_callback(boost::asio::ssl::rfc2818_verification(physicalAddress));
             }
 
@@ -239,6 +239,14 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
         }
 
         tlsSocket_ = executor_->createTlsSocket(socket_, ctx);
+
+        LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
+        if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) {
+            boost::system::error_code ec{static_cast<int>(::ERR_get_error()),
+                                         boost::asio::error::get_ssl_category()};
+            LOG_ERROR(boost::system::system_error{ec}.what() << ": Error while setting TLS SNI");
+            return;
+        }
     }
 }
 

[pulsar] 09/10: [flaky test] Fix unit tests that occasionally fail (#9226)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 38fae22588cc0041ad6794e6dd9d275ddd31f8bd
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Wed Jan 20 01:02:52 2021 +0800

    [flaky test] Fix unit tests that occasionally fail (#9226)
    
    * fix unit test
    
    * fix unit test
    
    * fix unit test
    
    (cherry picked from commit 63acd204914734c7098e5b7fb9e6fb3ee37810a7)
---
 .../broker/admin/IncrementPartitionsTest.java      |  13 ++-
 .../pulsar/broker/admin/TopicMessageTTLTest.java   |   1 +
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |   1 +
 .../broker/service/ConsumedLedgersTrimTest.java    |   7 +-
 .../service/MessagePublishBufferThrottleTest.java  |   1 +
 .../broker/service/ReplicatorRateLimiterTest.java  |   3 +
 .../pulsar/broker/service/ReplicatorTestBase.java  | 129 +++++++++++++--------
 .../service/persistent/TopicDuplicationTest.java   |  28 ++---
 8 files changed, 113 insertions(+), 70 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
index 84fa300..0f78afa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.admin;
 
 import static org.testng.Assert.assertEquals;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Consumer;
@@ -30,6 +32,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -119,13 +122,17 @@ public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest {
                 .create();
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 2);
-        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
+        //zk update takes some time
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2));
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 10);
-        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10));
 
         admin.topics().updatePartitionedTopic(partitionedTopicName, 20);
-        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() ->
+                assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
index 7bb93d7..e0eb52b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java
@@ -49,6 +49,7 @@ public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        resetConfig();
         this.conf.setSystemTopicEnabled(true);
         this.conf.setTopicLevelPoliciesEnabled(true);
         this.conf.setTtlDurationDefaultInSeconds(3600);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
index e797eb8..6d4148e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
@@ -83,6 +83,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     public void setup() throws Exception {
+        resetConfig();
         conf.setLoadBalancerEnabled(true);
         super.internalSetup();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
index cc84de2..c06126b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.MessageId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 
 public class ConsumedLedgersTrimTest extends BrokerTestBase {
 
@@ -46,9 +48,10 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
         //No-op
     }
 
+    @AfterMethod
     @Override
     protected void cleanup() throws Exception {
-        //No-op
+        super.internalCleanup();
     }
 
     @Test
@@ -102,7 +105,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase {
 
 
     @Test
-    public void TestConsumedLedgersTrimNoSubscriptions() throws Exception {
+    public void testConsumedLedgersTrimNoSubscriptions() throws Exception {
         conf.setRetentionCheckIntervalInSeconds(1);
         conf.setBrokerDeleteInactiveTopicsEnabled(false);
         super.baseSetup();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 6dd4515..75248f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -43,6 +43,7 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
+        resetConfig();
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
index 4c04ccf..3cb2247 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java
@@ -63,6 +63,9 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
     @AfterClass(alwaysRun = true, timeOut = 300000)
     void shutdown() throws Exception {
         super.shutdown();
+        resetConfig1();
+        resetConfig2();
+        resetConfig3();
     }
 
     enum DispatchRateType {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index 9f7ef09..38ed451 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -110,23 +110,7 @@ public class ReplicatorTestBase {
         // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have
         // completely
         // independent config objects instead of referring to the same properties object
-        config1.setClusterName("r1");
-        config1.setAdvertisedAddress("localhost");
-        config1.setWebServicePort(Optional.of(0));
-        config1.setWebServicePortTls(Optional.of(0));
-        config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
-        config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config1.setBrokerServicePort(Optional.of(0));
-        config1.setBrokerServicePortTls(Optional.of(0));
-        config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config1.setDefaultNumberOfNamespaceBundles(1);
-        config1.setAllowAutoTopicCreationType("non-partitioned");
+        setConfig1DefaultValue();
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
@@ -141,23 +125,7 @@ public class ReplicatorTestBase {
         bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
         bkEnsemble2.start();
 
-        config2.setClusterName("r2");
-        config2.setAdvertisedAddress("localhost");
-        config2.setWebServicePort(Optional.of(0));
-        config2.setWebServicePortTls(Optional.of(0));
-        config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
-        config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config2.setBrokerServicePort(Optional.of(0));
-        config2.setBrokerServicePortTls(Optional.of(0));
-        config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
-        config2.setDefaultNumberOfNamespaceBundles(1);
-        config2.setAllowAutoTopicCreationType("non-partitioned");
+        setConfig2DefaultValue();
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
@@ -172,23 +140,7 @@ public class ReplicatorTestBase {
         bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
         bkEnsemble3.start();
 
-        config3.setClusterName("r3");
-        config3.setAdvertisedAddress("localhost");
-        config3.setWebServicePort(Optional.of(0));
-        config3.setWebServicePortTls(Optional.of(0));
-        config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
-        config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
-        config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
-        config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
-                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
-        config3.setBrokerServicePort(Optional.of(0));
-        config3.setBrokerServicePortTls(Optional.of(0));
-        config3.setTlsEnabled(true);
-        config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
-        config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
-        config3.setDefaultNumberOfNamespaceBundles(1);
-        config3.setAllowAutoTopicCreationType("non-partitioned");
+        setConfig3DefaultValue();
         pulsar3 = new PulsarService(config3);
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();
@@ -227,6 +179,81 @@ public class ReplicatorTestBase {
 
     }
 
+    private void setConfig3DefaultValue() {
+        config3.setClusterName("r3");
+        config3.setAdvertisedAddress("localhost");
+        config3.setWebServicePort(Optional.of(0));
+        config3.setWebServicePortTls(Optional.of(0));
+        config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort());
+        config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+        config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+        config3.setBrokerDeleteInactiveTopicsFrequencySeconds(
+                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+        config3.setBrokerServicePort(Optional.of(0));
+        config3.setBrokerServicePortTls(Optional.of(0));
+        config3.setTlsEnabled(true);
+        config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config3.setDefaultNumberOfNamespaceBundles(1);
+        config3.setAllowAutoTopicCreationType("non-partitioned");
+    }
+
+    public void setConfig1DefaultValue(){
+        config1.setClusterName("r1");
+        config1.setAdvertisedAddress("localhost");
+        config1.setWebServicePort(Optional.of(0));
+        config1.setWebServicePortTls(Optional.of(0));
+        config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort());
+        config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+        config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+        config1.setBrokerDeleteInactiveTopicsFrequencySeconds(
+                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+        config1.setBrokerServicePort(Optional.of(0));
+        config1.setBrokerServicePortTls(Optional.of(0));
+        config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config1.setDefaultNumberOfNamespaceBundles(1);
+        config1.setAllowAutoTopicCreationType("non-partitioned");
+    }
+
+    public void setConfig2DefaultValue() {
+        config2.setClusterName("r2");
+        config2.setAdvertisedAddress("localhost");
+        config2.setWebServicePort(Optional.of(0));
+        config2.setWebServicePortTls(Optional.of(0));
+        config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort());
+        config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo");
+        config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
+        config2.setBrokerDeleteInactiveTopicsFrequencySeconds(
+                inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
+        config2.setBrokerServicePort(Optional.of(0));
+        config2.setBrokerServicePortTls(Optional.of(0));
+        config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+        config2.setDefaultNumberOfNamespaceBundles(1);
+        config2.setAllowAutoTopicCreationType("non-partitioned");
+    }
+
+    public void resetConfig1() {
+        config1 = new ServiceConfiguration();
+        setConfig1DefaultValue();
+    }
+
+    public void resetConfig2() {
+        config2 = new ServiceConfiguration();
+        setConfig2DefaultValue();
+    }
+
+    public void resetConfig3() {
+        config3 = new ServiceConfiguration();
+        setConfig3DefaultValue();
+    }
+
     private int inSec(int time, TimeUnit unit) {
         return (int) TimeUnit.SECONDS.convert(time, unit);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
index 397a6a5..fdea264 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java
@@ -106,13 +106,13 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
     }
 
     @Test(timeOut = 30000)
-    private void testTopicPolicyTakeSnapshot() throws Exception {
+    public void testTopicPolicyTakeSnapshot() throws Exception {
         resetConfig();
         conf.setSystemTopicEnabled(true);
         conf.setTopicLevelPoliciesEnabled(true);
         conf.setBrokerDeduplicationEnabled(true);
         conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
-        conf.setBrokerDeduplicationSnapshotIntervalSeconds(5);
+        conf.setBrokerDeduplicationSnapshotIntervalSeconds(7);
         conf.setBrokerDeduplicationEntriesInterval(20000);
         super.internalCleanup();
         super.internalSetup();
@@ -124,10 +124,10 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
         Producer<String> producer = pulsarClient
                 .newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create();
         waitCacheInit(topicName);
-        admin.topics().setDeduplicationSnapshotInterval(topicName, 1);
-        admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 2);
+        admin.topics().setDeduplicationSnapshotInterval(topicName, 3);
+        admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 5);
 
-        int msgNum = 50;
+        int msgNum = 10;
         CountDownLatch countDownLatch = new CountDownLatch(msgNum);
         for (int i = 0; i < msgNum; i++) {
             producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown());
@@ -139,19 +139,19 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
                 .getManagedLedger().getLastConfirmedEntry();
         assertEquals(seqId, msgNum - 1);
         assertEquals(position.getEntryId(), msgNum - 1);
-        //The first time, use topic-leve policies, 1 second delay + 1 second interval
-        Awaitility.await().atMost(2100, TimeUnit.MILLISECONDS)
+        //The first time, use topic-leve policies, 1 second delay + 3 second interval
+        Awaitility.await().atMost(5000, TimeUnit.MILLISECONDS)
                 .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
                         .getMarkDeletedPosition()).getEntryId() == msgNum - 1);
         ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
         PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
         assertEquals(position, markDeletedPosition);
 
-        //remove topic-level policies, namespace-level should be used, interval becomes 2 seconds
+        //remove topic-level policies, namespace-level should be used, interval becomes 5 seconds
         admin.topics().removeDeduplicationSnapshotInterval(topicName);
         producer.newMessage().value("msg").send();
-        //zk update time + interval time
-        Awaitility.await().atMost( 3000, TimeUnit.MILLISECONDS)
+        //zk update time + 5 second interval time
+        Awaitility.await().atMost( 7, TimeUnit.SECONDS)
                 .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
                         .getMarkDeletedPosition()).getEntryId() == msgNum);
         markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
@@ -159,20 +159,20 @@ public class TopicDuplicationTest extends ProducerConsumerBase {
         assertEquals(msgNum, markDeletedPosition.getEntryId());
         assertEquals(position, markDeletedPosition);
 
-        //4 remove namespace-level policies, broker-level should be used, interval becomes 2 seconds
+        //4 remove namespace-level policies, broker-level should be used, interval becomes 3 seconds
         admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace);
-        Awaitility.await().atMost(2, TimeUnit.SECONDS)
+        Awaitility.await().atMost(4, TimeUnit.SECONDS)
                 .until(() -> (admin.namespaces().getDeduplicationSnapshotInterval(myNamespace) == null));
         producer.newMessage().value("msg").send();
         //ensure that the time exceeds the scheduling interval of ns and topic, but no snapshot is generated
         Thread.sleep(3000);
         markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
         position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
-        // broker-level interval is 5 seconds, so 3 seconds will not take a snapshot
+        // broker-level interval is 7 seconds, so 3 seconds will not take a snapshot
         assertNotEquals(msgNum + 1, markDeletedPosition.getEntryId());
         assertNotEquals(position, markDeletedPosition);
         // wait for scheduler
-        Awaitility.await().atMost(3, TimeUnit.SECONDS)
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
                 .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor()
                         .getMarkDeletedPosition()).getEntryId() == msgNum + 1);
         markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();

[pulsar] 07/10: Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)

Posted by yo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f3b5ff73608b0143756f68ee1c178794bd8b535c
Author: baomingyu <ba...@163.com>
AuthorDate: Thu Apr 8 09:51:39 2021 +0800

    Fix 8115 Some partitions get stuck after adding additional consumers to the KEY_SHARED subscriptions (#10096)
    
    Fixes #8115
    
    Master Issue: #8115
    
    first point:
     Sometimes it will not success to call this method and the method readMoreEntries will not be called
    ` if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
                            readMoreEntries();
     } `
    
    second point:
      Sometimes  keyNumbers will not be decrement to zero , and broker will not be start next  loop to readMoreEntries.
    some partition topic will be stunk and stop to push message to consumer ,even though  there is permits in consumers.
    
    (cherry picked from commit c4f154e79c03cff9055aa4e2ede7748c5952f2bc)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index d7d08d1..ca078ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.Entry;
@@ -172,6 +173,10 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
 
+        int currentThreadKeyNumber = groupedEntries.size();
+        if (currentThreadKeyNumber == 0) {
+            currentThreadKeyNumber = -1;
+        }
         for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) {
             Consumer consumer = current.getKey();
             List<Entry> entriesWithSameKey = current.getValue();
@@ -213,7 +218,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
                         getRedeliveryTracker()).addListener(future -> {
-                            if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
+                            if (future.isDone() && keyNumbers.decrementAndGet() == 0) {
                                 readMoreEntries();
                             }
                         });
@@ -221,6 +226,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
+            } else {
+                currentThreadKeyNumber = keyNumbers.decrementAndGet();
             }
         }
 
@@ -257,6 +264,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             }
             // readMoreEntries should run regardless whether or not stuck is caused by stuckConsumers for avoid stopping dispatch.
             readMoreEntries();
+        }  else if (currentThreadKeyNumber == 0) {
+            topic.getBrokerService().executor().schedule(() -> {
+                synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
+                    readMoreEntries();
+                }
+            }, 100, TimeUnit.MILLISECONDS);
         }
     }