You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/07/07 05:31:24 UTC

[pulsar] branch branch-2.8 updated (14a731e -> aa290ae)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 14a731e  Fixed possible deadlock in the initialization of MLTransactionLog (#11194)
     new 1663338  Make the compaction phase one loop timeout configurable (#11206)
     new 1b3582c  Remove unused listener to reduce creating executor pool. (#11215)
     new 01f1596   [Issue 11067][pulsar-client] Fix bin/pulsar-client produce not supporting v2 topic name through websocket (#11069)
     new 0b27235  [Broker] Fix broker dispatch byte rate limiter. (#11135)
     new aa290ae  Fix init WebSocketService with ClusterData (#11234)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |   4 +
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +
 .../broker/service/AbstractBaseDispatcher.java     |  19 ++-
 .../AbstractDispatcherMultipleConsumers.java       |   5 +-
 .../AbstractDispatcherSingleActiveConsumer.java    |   6 +-
 .../NonPersistentDispatcherMultipleConsumers.java  |   5 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |   6 +-
 .../service/persistent/DispatchRateLimiter.java    |   9 ++
 .../PersistentDispatcherMultipleConsumers.java     |  50 ++++---
 .../PersistentDispatcherSingleActiveConsumer.java  |  49 ++++---
 ...istentStreamingDispatcherMultipleConsumers.java |  10 +-
 ...entStreamingDispatcherSingleActiveConsumer.java |  11 +-
 .../streamingdispatch/StreamingEntryReader.java    |   4 +-
 .../pulsar/compaction/TwoPhaseCompactor.java       |   9 +-
 .../client/api/MessageDispatchThrottlingTest.java  |  42 ++----
 .../apache/pulsar/compaction/CompactorTest.java    |  14 ++
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  23 ++-
 .../apache/pulsar/client/cli/TestCmdProduce.java   |  32 ++---
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 117 +++++++++------
 .../proxy/server/ProxyServiceStarterTest.java      | 160 +++++++++++++++++++++
 .../src/test/resources}/proxy.conf                 |   4 +-
 .../impl/FileSystemManagedLedgerOffloader.java     |   3 +-
 22 files changed, 425 insertions(+), 164 deletions(-)
 copy pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/executor/MockClockTest.java => pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java (53%)
 create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
 copy {conf => pulsar-proxy/src/test/resources}/proxy.conf (99%)

[pulsar] 05/05: Fix init WebSocketService with ClusterData (#11234)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit aa290aee04b77954f552e3929fc1e6b7608c6a21
Author: Tboy <te...@apache.org>
AuthorDate: Wed Jul 7 07:45:03 2021 +0800

    Fix init WebSocketService with ClusterData (#11234)
    
    1. Fix init WebSocketService with ClusterData.
    2. Refactor ProxyServiceStarter to be easier for test.
    3. Add test case for ProxyServiceStarter.
    
    (cherry picked from commit 93e145bea96c88a24d33cf65e657e86ac4e4c491)
---
 .../pulsar/proxy/server/ProxyServiceStarter.java   | 117 ++++++----
 .../proxy/server/ProxyServiceStarterTest.java      | 160 ++++++++++++++
 pulsar-proxy/src/test/resources/proxy.conf         | 236 +++++++++++++++++++++
 3 files changed, 470 insertions(+), 43 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
index 225016c..8f868dc 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java
@@ -21,14 +21,17 @@ package org.apache.pulsar.proxy.server;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
 import static org.slf4j.bridge.SLF4JBridgeHandler.install;
 import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
+import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketPingPongServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
@@ -82,6 +85,8 @@ public class ProxyServiceStarter {
     @Parameter(names = { "-h", "--help" }, description = "Show this help message")
     private boolean help = false;
 
+    private ProxyConfiguration config;
+
     public ProxyServiceStarter(String[] args) throws Exception {
         try {
 
@@ -108,7 +113,7 @@ public class ProxyServiceStarter {
             }
 
             // load config file
-            final ProxyConfiguration config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
+            config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
 
             if (!isBlank(zookeeperServers)) {
                 // Use zookeeperServers from command line
@@ -136,55 +141,58 @@ public class ProxyServiceStarter {
                 checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
             }
 
-            AuthenticationService authenticationService = new AuthenticationService(
-                    PulsarConfigurationLoader.convertFrom(config));
-            // create proxy service
-            ProxyService proxyService = new ProxyService(config, authenticationService);
-            // create a web-service
-            final WebServer server = new WebServer(config, authenticationService);
-
-            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-                try {
-                    proxyService.close();
-                    server.stop();
-                } catch (Exception e) {
-                    log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
-                }
-            }));
+        } catch (Exception e) {
+            log.error("Failed to start pulsar proxy service. error msg " + e.getMessage(), e);
+            throw new PulsarServerException(e);
+        }
+    }
 
-            proxyService.start();
+    public static void main(String[] args) throws Exception {
+        ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
+        serviceStarter.start();
+    }
 
-            // Setup metrics
-            DefaultExports.initialize();
+    public void start() throws Exception {
+        AuthenticationService authenticationService = new AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(config));
+        // create proxy service
+        ProxyService proxyService = new ProxyService(config, authenticationService);
+        // create a web-service
+        final WebServer server = new WebServer(config, authenticationService);
 
-            // Report direct memory from Netty counters
-            Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return getJvmDirectMemoryUsed();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            try {
+                proxyService.close();
+                server.stop();
+            } catch (Exception e) {
+                log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
+            }
+        }));
 
-            Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
-                @Override
-                public double get() {
-                    return PlatformDependent.maxDirectMemory();
-                }
-            }).register(CollectorRegistry.defaultRegistry);
+        proxyService.start();
 
-            addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
+        // Setup metrics
+        DefaultExports.initialize();
 
-            // start web-service
-            server.start();
+        // Report direct memory from Netty counters
+        Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
+            @Override
+            public double get() {
+                return getJvmDirectMemoryUsed();
+            }
+        }).register(CollectorRegistry.defaultRegistry);
 
-        } catch (Exception e) {
-            log.error("Failed to start pulsar proxy service. error msg " + e.getMessage(), e);
-            throw new PulsarServerException(e);
-        }
-    }
+        Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
+            @Override
+            public double get() {
+                return PlatformDependent.maxDirectMemory();
+            }
+        }).register(CollectorRegistry.defaultRegistry);
 
-    public static void main(String[] args) throws Exception {
-        new ProxyServiceStarter(args);
+        addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
+
+        // start web-service
+        server.start();
     }
 
     public static void addWebServerHandlers(WebServer server,
@@ -225,7 +233,7 @@ public class ProxyServiceStarter {
         if (config.isWebSocketServiceEnabled()) {
             // add WebSocket servlet
             // Use local broker address to avoid different IP address when using a VIP for service discovery
-            WebSocketService webSocketService = new WebSocketService(null, PulsarConfigurationLoader.convertFrom(config));
+            WebSocketService webSocketService = new WebSocketService(createClusterData(config), PulsarConfigurationLoader.convertFrom(config));
             webSocketService.start();
             final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
             server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
@@ -253,6 +261,29 @@ public class ProxyServiceStarter {
         }
     }
 
+    private static ClusterData createClusterData(ProxyConfiguration config) {
+        if (isNotBlank(config.getBrokerServiceURL()) || isNotBlank(config.getBrokerServiceURLTLS())) {
+            return ClusterData.builder()
+                    .serviceUrl(config.getBrokerWebServiceURL())
+                    .serviceUrlTls(config.getBrokerWebServiceURLTLS())
+                    .brokerServiceUrl(config.getBrokerServiceURL())
+                    .brokerServiceUrlTls(config.getBrokerServiceURLTLS())
+                    .build();
+        } else if (isNotBlank(config.getBrokerWebServiceURL()) || isNotBlank(config.getBrokerWebServiceURLTLS())) {
+            return ClusterData.builder()
+                    .serviceUrl(config.getBrokerWebServiceURL())
+                    .serviceUrlTls(config.getBrokerWebServiceURLTLS())
+                    .build();
+        } else {
+            return null;
+        }
+    }
+
+    @VisibleForTesting
+    public ProxyConfiguration getConfig() {
+        return config;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
 
 }
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
new file mode 100644
index 0000000..63d7e39
--- /dev/null
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.proxy.server;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.websocket.data.ProducerMessage;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
+import org.eclipse.jetty.websocket.client.WebSocketClient;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Base64;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Future;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class ProxyServiceStarterTest extends MockedPulsarServiceBaseTest {
+
+    @Override
+    @BeforeClass
+    protected void setup() throws Exception {
+        internalSetup();
+        String[] args = new String[]{"-c", "./src/test/resources/proxy.conf"};
+        ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
+        serviceStarter.getConfig().setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
+        serviceStarter.getConfig().setServicePort(Optional.of(11000));
+        serviceStarter.getConfig().setWebSocketServiceEnabled(true);
+        serviceStarter.start();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test
+    public void testEnableWebSocketServer() throws Exception {
+        HttpClient httpClient = new HttpClient();
+        WebSocketClient webSocketClient = new WebSocketClient(httpClient);
+        webSocketClient.start();
+        MyWebSocket myWebSocket = new MyWebSocket();
+        String webSocketUri = "ws://localhost:8080/ws/pingpong";
+        Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
+        sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
+        assertTrue(myWebSocket.getResponse().contains("ping"));
+    }
+
+    @Test
+    public void testProducer() throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:11000")
+                .build();
+
+        @Cleanup
+        Producer<byte[]> producer = client.newProducer()
+                .topic("persistent://sample/test/local/websocket-topic")
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("test".getBytes());
+        }
+    }
+
+    @Test
+    public void testProduceAndConsumeMessageWithWebsocket() throws Exception {
+        HttpClient producerClient = new HttpClient();
+        WebSocketClient producerWebSocketClient = new WebSocketClient(producerClient);
+        producerWebSocketClient.start();
+        MyWebSocket producerSocket = new MyWebSocket();
+        String produceUri = "ws://localhost:8080/ws/producer/persistent/sample/test/local/websocket-topic";
+        Future<Session> producerSession = producerWebSocketClient.connect(producerSocket, URI.create(produceUri));
+
+        ProducerMessage produceRequest = new ProducerMessage();
+        produceRequest.setContext("context");
+        produceRequest.setPayload(Base64.getEncoder().encodeToString("my payload".getBytes()));
+
+        HttpClient consumerClient = new HttpClient();
+        WebSocketClient consumerWebSocketClient = new WebSocketClient(consumerClient);
+        consumerWebSocketClient.start();
+        MyWebSocket consumerSocket = new MyWebSocket();
+        String consumeUri = "ws://localhost:8080/ws/consumer/persistent/sample/test/local/websocket-topic/my-sub";
+        Future<Session> consumerSession = consumerWebSocketClient.connect(consumerSocket, URI.create(consumeUri));
+        consumerSession.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
+        producerSession.get().getRemote().sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(produceRequest));
+        assertTrue(consumerSocket.getResponse().contains("ping"));
+        ProducerMessage message = ObjectMapperFactory.getThreadLocal().readValue(consumerSocket.getResponse(), ProducerMessage.class);
+        assertEquals(new String(Base64.getDecoder().decode(message.getPayload())), "my payload");
+    }
+
+    @WebSocket
+    public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {
+
+        ArrayBlockingQueue<String> incomingMessages = new ArrayBlockingQueue<>(10);
+
+        @Override
+        public void onWebSocketText(String message) {
+            incomingMessages.add(message);
+        }
+
+        @Override
+        public void onWebSocketClose(int i, String s) {
+        }
+
+        @Override
+        public void onWebSocketConnect(Session session) {
+        }
+
+        @Override
+        public void onWebSocketError(Throwable throwable) {
+        }
+
+        @Override
+        public void onWebSocketPing(ByteBuffer payload) {
+        }
+
+        @Override
+        public void onWebSocketPong(ByteBuffer payload) {
+            incomingMessages.add(BufferUtil.toDetailString(payload));
+        }
+
+        public String getResponse() throws InterruptedException {
+            return incomingMessages.take();
+        }
+    }
+
+}
diff --git a/pulsar-proxy/src/test/resources/proxy.conf b/pulsar-proxy/src/test/resources/proxy.conf
new file mode 100644
index 0000000..b5ed33f
--- /dev/null
+++ b/pulsar-proxy/src/test/resources/proxy.conf
@@ -0,0 +1,236 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+### --- Broker Discovery --- ###
+
+# The ZooKeeper quorum connection string (as a comma-separated list)
+zookeeperServers=
+
+# Configuration store connection string (as a comma-separated list)
+configurationStoreServers=
+
+# if Service Discovery is Disabled this url should point to the discovery service provider.
+brokerServiceURL=pulsar://0.0.0.0:0
+brokerServiceURLTLS=
+
+# These settings are unnecessary if `zookeeperServers` is specified
+brokerWebServiceURL=http://0.0.0.0:0
+brokerWebServiceURLTLS=
+
+# If function workers are setup in a separate cluster, configure the following 2 settings
+# to point to the function workers cluster
+functionWorkerWebServiceURL=
+functionWorkerWebServiceURLTLS=
+
+# ZooKeeper session timeout (in milliseconds)
+zookeeperSessionTimeoutMs=30000
+
+# ZooKeeper cache expiry time in seconds
+zooKeeperCacheExpirySeconds=300
+
+### --- Server --- ###
+
+# Hostname or IP address the service binds on, default is 0.0.0.0.
+bindAddress=0.0.0.0
+
+# Hostname or IP address the service advertises to the outside world.
+# If not set, the value of `InetAddress.getLocalHost().getHostname()` is used.
+advertisedAddress=
+
+# Enable or disable the HAProxy protocol.
+haProxyProtocolEnabled=false
+
+# The port to use for server binary Protobuf requests
+servicePort=6650
+
+# The port to use to server binary Protobuf TLS requests
+servicePortTls=
+
+# Port that discovery service listen on
+webServicePort=8080
+
+# Port to use to server HTTPS request
+webServicePortTls=
+
+# Path for the file used to determine the rotation status for the proxy instance when responding
+# to service discovery health checks
+statusFilePath=
+
+# Proxy log level, default is 0.
+# 0: Do not log any tcp channel info
+# 1: Parse and log any tcp channel info and command info without message body
+# 2: Parse and log channel info, command info and message body
+proxyLogLevel=0
+
+### ---Authorization --- ###
+
+# Role names that are treated as "super-users," meaning that they will be able to perform all admin
+# operations and publish/consume to/from all topics (as a comma-separated list)
+superUserRoles=
+
+# Whether authorization is enforced by the Pulsar proxy
+authorizationEnabled=false
+
+# Authorization provider as a fully qualified class name
+authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
+
+# Whether client authorization credentials are forwared to the broker for re-authorization.
+# Authentication must be enabled via authenticationEnabled=true for this to take effect.
+forwardAuthorizationCredentials=false
+
+### --- Authentication --- ###
+
+# Whether authentication is enabled for the Pulsar proxy
+authenticationEnabled=false
+
+# Authentication provider name list (a comma-separated list of class names)
+authenticationProviders=
+
+# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
+anonymousUserRole=
+
+### --- Client Authentication --- ###
+
+# The three brokerClient* authentication settings below are for the proxy itself and determine how it
+# authenticates with Pulsar brokers
+
+# The authentication plugin used by the Pulsar proxy to authenticate with Pulsar brokers
+brokerClientAuthenticationPlugin=
+
+# The authentication parameters used by the Pulsar proxy to authenticate with Pulsar brokers
+brokerClientAuthenticationParameters=
+
+# The path to trusted certificates used by the Pulsar proxy to authenticate with Pulsar brokers
+brokerClientTrustCertsFilePath=
+
+# Whether TLS is enabled when communicating with Pulsar brokers
+tlsEnabledWithBroker=false
+
+# Tls cert refresh duration in seconds (set 0 to check on every new connection)
+tlsCertRefreshCheckDurationSec=300
+
+##### --- Rate Limiting --- #####
+
+# Max concurrent inbound connections. The proxy will reject requests beyond that.
+maxConcurrentInboundConnections=10000
+
+# Max concurrent outbound connections. The proxy will error out requests beyond that.
+maxConcurrentLookupRequests=50000
+
+##### --- TLS --- #####
+
+# Deprecated - use servicePortTls and webServicePortTls instead
+tlsEnabledInProxy=false
+
+# Path for the TLS certificate file
+tlsCertificateFilePath=
+
+# Path for the TLS private key file
+tlsKeyFilePath=
+
+# Path for the trusted TLS certificate file.
+# This cert is used to verify that any certs presented by connecting clients
+# are signed by a certificate authority. If this verification
+# fails, then the certs are untrusted and the connections are dropped.
+tlsTrustCertsFilePath=
+
+# Accept untrusted TLS certificate from client.
+# If true, a client with a cert which cannot be verified with the
+# 'tlsTrustCertsFilePath' cert will allowed to connect to the server,
+# though the cert will not be used for client authentication.
+tlsAllowInsecureConnection=false
+
+# Whether the hostname is validated when the proxy creates a TLS connection with brokers
+tlsHostnameVerificationEnabled=false
+
+# Specify the tls protocols the broker will use to negotiate during TLS handshake
+# (a comma-separated list of protocol names).
+# Examples:- [TLSv1.3, TLSv1.2]
+tlsProtocols=
+
+# Specify the tls cipher the broker will use to negotiate during TLS Handshake
+# (a comma-separated list of ciphers).
+# Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
+tlsCiphers=
+
+# Whether client certificates are required for TLS. Connections are rejected if the client
+# certificate isn't trusted.
+tlsRequireTrustedClientCertOnConnect=false
+
+##### --- HTTP --- #####
+
+# Http directs to redirect to non-pulsar services.
+httpReverseProxyConfigs=
+
+# Http output buffer size. The amount of data that will be buffered for http requests
+# before it is flushed to the channel. A larger buffer size may result in higher http throughput
+# though it may take longer for the client to see data.
+# If using HTTP streaming via the reverse proxy, this should be set to the minimum value, 1,
+# so that clients see the data as soon as possible.
+httpOutputBufferSize=32768
+
+# Number of threads to use for HTTP requests processing. Default is
+# 2 * Runtime.getRuntime().availableProcessors()
+httpNumThreads=
+
+# Enable the enforcement of limits on the incoming HTTP requests
+httpRequestsLimitEnabled=false
+
+# Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests)
+httpRequestsMaxPerSecond=100.0
+
+
+### --- Token Authentication Provider --- ###
+
+## Symmetric key
+# Configure the secret key to be used to validate auth tokens
+# The key can be specified like:
+# tokenSecretKey=data:;base64,xxxxxxxxx
+# tokenSecretKey=file:///my/secret.key    ( Note: key file must be DER-encoded )
+tokenSecretKey=
+
+## Asymmetric public/private key pair
+# Configure the public key to be used to validate auth tokens
+# The key can be specified like:
+# tokenPublicKey=data:;base64,xxxxxxxxx
+# tokenPublicKey=file:///my/public.key    ( Note: key file must be DER-encoded )
+tokenPublicKey=
+
+# The token "claim" that will be interpreted as the authentication "role" or "principal" by AuthenticationProviderToken (defaults to "sub" if blank)
+tokenAuthClaim=
+
+# The token audience "claim" name, e.g. "aud", that will be used to get the audience from token.
+# If not set, audience will not be verified.
+tokenAudienceClaim=
+
+# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
+tokenAudience=
+
+### --- WebSocket config variables --- ###
+
+# Enable or disable the WebSocket servlet.
+webSocketServiceEnabled=false
+
+# Name of the cluster to which this broker belongs to
+clusterName=
+
+### --- Deprecated config variables --- ###
+
+# Deprecated. Use configurationStoreServers
+globalZookeeperServers=

[pulsar] 04/05: [Broker] Fix broker dispatch byte rate limiter. (#11135)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0b27235a051f240125b986750c753d3210be1a3c
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Jul 6 10:28:43 2021 +0800

    [Broker] Fix broker dispatch byte rate limiter. (#11135)
    
    ## Motivation
    fix https://github.com/apache/pulsar/issues/11044
    now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that  dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
    
    (cherry picked from commit ce6be124c9c86a6e10604ff44dc817f5d6f13c0e)
---
 .../broker/service/AbstractBaseDispatcher.java     | 19 +++++++-
 .../AbstractDispatcherMultipleConsumers.java       |  5 ++-
 .../AbstractDispatcherSingleActiveConsumer.java    |  6 ++-
 .../NonPersistentDispatcherMultipleConsumers.java  |  5 +--
 ...onPersistentDispatcherSingleActiveConsumer.java |  6 +--
 .../service/persistent/DispatchRateLimiter.java    |  9 ++++
 .../PersistentDispatcherMultipleConsumers.java     | 50 ++++++++++++----------
 .../PersistentDispatcherSingleActiveConsumer.java  | 49 ++++++++++++---------
 ...istentStreamingDispatcherMultipleConsumers.java | 10 +++--
 ...entStreamingDispatcherSingleActiveConsumer.java | 11 +++--
 .../streamingdispatch/StreamingEntryReader.java    |  4 +-
 .../client/api/MessageDispatchThrottlingTest.java  | 42 +++++++-----------
 12 files changed, 122 insertions(+), 94 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index bb10df4..3646ae6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -47,8 +48,11 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
 
     protected final Subscription subscription;
 
-    protected AbstractBaseDispatcher(Subscription subscription) {
+    protected final ServiceConfiguration serviceConfig;
+
+    protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
         this.subscription = subscription;
+        this.serviceConfig = serviceConfig;
     }
 
     /**
@@ -234,6 +238,19 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
         // noop
     }
 
+    protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg,
+                                                           long bytesToRead, long availablePermitsOnByte) {
+        if (availablePermitsOnMsg > 0) {
+            messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
+        }
+
+        if (availablePermitsOnByte > 0) {
+            bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
+        }
+
+        return Pair.of(messagesToRead, bytesToRead);
+    }
+
     protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
         return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName());
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index 2f6b9a6..ad98059 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -23,6 +23,7 @@ import com.carrotsearch.hppc.ObjectSet;
 import java.util.Random;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.slf4j.Logger;
@@ -46,8 +47,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
 
     private Random random = new Random(42);
 
-    protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
-        super(subscription);
+    protected AbstractDispatcherMultipleConsumers(Subscription subscription, ServiceConfiguration serviceConfig) {
+        super(subscription, serviceConfig);
     }
 
     public boolean isConsumerConnected() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index b142c51..e73daaa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
@@ -55,8 +56,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
     private volatile int isClosed = FALSE;
 
     public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
-            String topicName, Subscription subscription) {
-        super(subscription);
+                                                  String topicName, Subscription subscription,
+                                                  ServiceConfiguration serviceConfig) {
+        super(subscription, serviceConfig);
         this.topicName = topicName;
         this.consumers = new CopyOnWriteArrayList<>();
         this.partitionIndex = partitionIndex;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 1a19186..5688683 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -23,7 +23,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
     @SuppressWarnings("unused")
     private volatile int totalAvailablePermits = 0;
 
-    private final ServiceConfiguration serviceConfig;
     private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
-        super(subscription);
+        super(subscription, topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.subscription = subscription;
         this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
-        this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
         this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index a720767..6094ab7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.service.nonpersistent;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -40,16 +39,15 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     private final NonPersistentTopic topic;
     private final Rate msgDrop;
     private final Subscription subscription;
-    private final ServiceConfiguration serviceConfig;
     private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
                                                        NonPersistentTopic topic, Subscription subscription) {
-        super(subscriptionType, partitionIndex, topic.getName(), subscription);
+        super(subscriptionType, partitionIndex, topic.getName(), subscription,
+                topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.subscription = subscription;
         this.msgDrop = new Rate();
-        this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
         this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 8cf4427..994d274 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -76,6 +76,15 @@ public class DispatchRateLimiter {
     }
 
     /**
+     * returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1.
+     *
+     * @return
+     */
+    public long getAvailableDispatchRateLimitOnByte() {
+        return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits();
+    }
+
+    /**
      * It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
      *
      * @param msgPermits
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 3426355..58663f4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -38,7 +38,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -105,7 +105,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
             AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
                     "blockedDispatcherOnUnackedMsgs");
-    protected final ServiceConfiguration serviceConfig;
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
     protected enum ReadType {
@@ -114,8 +113,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
                                                  Subscription subscription) {
-        super(subscription);
-        this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
+        super(subscription, topic.getBrokerService().pulsar().getConfiguration());
         this.cursor = cursor;
         this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
@@ -223,9 +221,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
         int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
         if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
-            int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
+            Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
 
-            if (-1 == messagesToRead) {
+            if (messagesToRead == -1 || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
                 return;
             }
@@ -262,8 +262,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
-                cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
-                        this,
+                cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
                 log.debug("[{}] Cannot schedule next read until previous one is done", name);
@@ -275,8 +274,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         }
     }
 
-    protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
+    // left pair is messagesToRead, right pair is bytesToRead
+    protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits) {
         int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
+        long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
 
         Consumer c = getRandomConsumer();
         // if turn on precise dispatcher flow control, adjust the record to read
@@ -309,13 +310,15 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                     }
                     topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
                             TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according to available permit
-                    long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                    if (availablePermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
-                    }
+                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
+                            (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
+
                 }
             }
 
@@ -330,13 +333,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                     }
                     topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
                             TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according to available permit
-                    long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                    if (availablePermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
-                    }
+                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
+                            (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
                 }
             }
 
@@ -346,11 +350,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
             }
-            return -1;
+            return Pair.of(-1, -1L);
         }
 
         // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
-        return Math.max(messagesToRead, 1);
+        return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
     }
 
     protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 93c2d25..2bd94ff 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -34,7 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadE
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
-import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
@@ -67,19 +67,18 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
     protected volatile int readBatchSize;
     protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
             1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
-    protected final ServiceConfiguration serviceConfig;
     private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
 
     private final RedeliveryTracker redeliveryTracker;
 
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
                                                     PersistentTopic topic, Subscription subscription) {
-        super(subscriptionType, partitionIndex, topic.getName(), subscription);
+        super(subscriptionType, partitionIndex, topic.getName(), subscription,
+                topic.getBrokerService().pulsar().getConfiguration());
         this.topic = topic;
         this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
                 : ""/* NonDurableCursor doesn't have name */);
         this.cursor = cursor;
-        this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
         this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
         this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
         this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
@@ -324,9 +323,11 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         }
 
         if (consumer.getAvailablePermits() > 0) {
-            int messagesToRead = calculateNumOfMessageToRead(consumer);
+            Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
 
-            if (-1 == messagesToRead) {
+            if (-1 == messagesToRead || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate.
                 return;
             }
@@ -340,7 +341,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
             } else {
                 cursor.asyncReadEntriesOrWait(messagesToRead,
-                        serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer, topic.getMaxReadPosition());
+                        bytesToRead, this, consumer, topic.getMaxReadPosition());
             }
         } else {
             if (log.isDebugEnabled()) {
@@ -349,7 +350,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         }
     }
 
-    protected int calculateNumOfMessageToRead(Consumer consumer) {
+    protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
         int availablePermits = consumer.getAvailablePermits();
         if (!consumer.isWritable()) {
             // If the connection is not currently writable, we issue the read request anyway, but for a single
@@ -360,6 +361,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
         }
 
         int messagesToRead = Math.min(availablePermits, readBatchSize);
+        long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
         // if turn of precise dispatcher flow control, adjust the records to read
         if (consumer.isPreciseDispatcherFlowControl()) {
             int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
@@ -391,13 +393,16 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                             }
                         }
                     }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according to available permit
-                    long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
-                    if (availablePermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
-                    }
+
+                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
+                            (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
+
                 }
             }
 
@@ -421,19 +426,21 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                             }
                         }
                     }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
-                    return -1;
+                    return Pair.of(-1, -1L);
                 } else {
-                    // if dispatch-rate is in msg then read only msg according to available permit
-                    long subPermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
-                    if (subPermitsOnMsg > 0) {
-                        messagesToRead = Math.min(messagesToRead, (int) subPermitsOnMsg);
-                    }
+
+                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
+                            (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
+                            bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
+
+                    messagesToRead = calculateResult.getLeft();
+                    bytesToRead = calculateResult.getRight();
                 }
             }
         }
 
         // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
-        return Math.max(messagesToRead, 1);
+        return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
index 666bb98..f7d47e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java
@@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest;
@@ -139,9 +140,10 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
         // totalAvailablePermits may be updated by other threads
         int currentTotalAvailablePermits = totalAvailablePermits;
         if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
-            int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
-
-            if (-1 == messagesToRead) {
+            Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
+            if (-1 == messagesToRead || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
                 return;
             }
@@ -178,7 +180,7 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
                             consumerList.size());
                 }
                 havePendingRead = true;
-                streamingEntryReader.asyncReadEntries(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
+                streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead,
                         ReadType.Normal);
             } else {
                 log.debug("[{}] Cannot schedule next read until previous one is done", name);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index b4e4ed3..82e8d6d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.util.SafeRun;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -179,9 +180,12 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
         }
 
         if (!havePendingRead && consumer.getAvailablePermits() > 0) {
-            int messagesToRead = calculateNumOfMessageToRead(consumer);
+            Pair<Integer, Long> calculateResult = calculateToRead(consumer);
+            int messagesToRead = calculateResult.getLeft();
+            long bytesToRead = calculateResult.getRight();
 
-            if (-1 == messagesToRead) {
+
+            if (-1 == messagesToRead || bytesToRead == -1) {
                 // Skip read as topic/dispatcher has exceed the dispatch rate.
                 return;
             }
@@ -195,8 +199,7 @@ public class PersistentStreamingDispatcherSingleActiveConsumer extends Persisten
             if (consumer.readCompacted()) {
                 topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
             } else {
-                streamingEntryReader.asyncReadEntries(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
-                        consumer);
+                streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
             }
         } else {
             if (log.isDebugEnabled()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index 51356c9..12f5600 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -68,7 +68,7 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W
     private static final AtomicReferenceFieldUpdater<StreamingEntryReader, State> STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, State.class, "state");
 
-    private volatile int maxReadSizeByte;
+    private volatile long maxReadSizeByte;
 
     private final Backoff readFailureBackoff = new Backoff(10, TimeUnit.MILLISECONDS,
             1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
@@ -81,7 +81,7 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W
      * @param maxReadSizeByte maximum byte will be read from ledger.
      * @param ctx Context send along with read request.
      */
-    public synchronized void asyncReadEntries(int numEntriesToRead, int maxReadSizeByte, Object ctx) {
+    public synchronized void asyncReadEntries(int numEntriesToRead, long maxReadSizeByte, Object ctx) {
         if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
             internalCancelReadRequests();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index fc3cfaf..f605028 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -413,12 +414,15 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
      */
     @Test(dataProvider = "subscriptions", timeOut = 5000)
     public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
         log.info("-- Starting {} test --", methodName);
 
         final String namespace = "my-property/throttling_ns";
         final String topicName = "persistent://" + namespace + "/throttlingAll";
+        final String subscriptionName = "my-subscriber-name";
 
-        final int byteRate = 100;
+        //
+        final int byteRate = 250;
         DispatchRate dispatchRate = DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(-1)
                 .dispatchThrottlingRateInByte(byteRate)
@@ -426,47 +430,31 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
                 .build();
         admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
         admin.namespaces().setDispatchRate(namespace, dispatchRate);
+        admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
         // create producer and topic
-        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
-        boolean isMessageRateUpdate = false;
-        int retry = 5;
-        for (int i = 0; i < retry; i++) {
-            if (topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0) {
-                isMessageRateUpdate = true;
-                break;
-            } else {
-                if (i != retry - 1) {
-                    Thread.sleep(100);
-                }
-            }
-        }
-        Assert.assertTrue(isMessageRateUpdate);
+        Awaitility.await().until(() -> topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0);
         Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate);
 
         final int numProducedMessages = 20;
-        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
 
         final AtomicInteger totalReceived = new AtomicInteger(0);
 
-        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[99]);
+        }
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName)
                 .subscriptionType(subscription).messageListener((c1, msg) -> {
                     Assert.assertNotNull(msg, "Message cannot be null");
                     String receivedMessage = new String(msg.getData());
                     log.debug("Received message [{}] in the listener", receivedMessage);
                     totalReceived.incrementAndGet();
-                    latch.countDown();
                 }).subscribe();
-        // deactive cursors
-        deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger());
 
-        // Asynchronously produce messages
-        for (int i = 0; i < numProducedMessages; i++) {
-            producer.send(new byte[byteRate / 10]);
-        }
-
-        latch.await();
-        Assert.assertEquals(totalReceived.get(), numProducedMessages);
+        Awaitility.await().atLeast(3, TimeUnit.SECONDS)
+                .atMost(5, TimeUnit.SECONDS).until(() -> totalReceived.get() > 6 && totalReceived.get() < 10);
 
         consumer.close();
         producer.close();

[pulsar] 03/05: [Issue 11067][pulsar-client] Fix bin/pulsar-client produce not supporting v2 topic name through websocket (#11069)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 01f159646b00bdf86977bed693947ec07bfc9d78
Author: Jason918 <ja...@qq.com>
AuthorDate: Tue Jul 6 11:45:52 2021 +0800

     [Issue 11067][pulsar-client] Fix bin/pulsar-client produce not supporting v2 topic name through websocket (#11069)
    
    Fixes #11067
    
    
    ### Motivation
    
    Fix bin/pulsar-client produce not supporting v2 topic name through websocket.
    
    ### Modifications
    
    Add "v2" in websoket url if topicName is in v2 format.
    
    (cherry picked from commit b4780ffc85759c3b8bd924632ec0c0dc3359116e)
---
 .../org/apache/pulsar/client/cli/CmdProduce.java   | 23 +++++++++--
 .../apache/pulsar/client/cli/TestCmdProduce.java   | 44 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 40b0b3d..326e221 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -24,6 +24,7 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.gson.JsonParseException;
@@ -265,14 +266,28 @@ public class CmdProduce {
     }
 
     @SuppressWarnings("deprecation")
+    @VisibleForTesting
+    public String getProduceBaseEndPoint(String topic) {
+        TopicName topicName = TopicName.get(topic);
+        String produceBaseEndPoint;
+        if (topicName.isV2()) {
+            String wsTopic = String.format("%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(),
+                    topicName.getNamespacePortion(), topicName.getLocalName());
+            produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/v2/producer/" + wsTopic;
+        } else {
+            String wsTopic = String.format("%s/%s/%s/%s/%s", topicName.getDomain(), topicName.getTenant(),
+                    topicName.getCluster(), topicName.getNamespacePortion(), topicName.getLocalName());
+            produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/producer/" + wsTopic;
+        }
+        return produceBaseEndPoint;
+    }
+
+    @SuppressWarnings("deprecation")
     private int publishToWebSocket(String topic) {
         int numMessagesSent = 0;
         int returnCode = 0;
 
-        TopicName topicName = TopicName.get(topic);
-        String wsTopic = String.format("%s/%s/"+(StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster()+"/")+"%s/%s", topicName.getDomain(),topicName.getTenant(),topicName.getNamespacePortion(),topicName.getLocalName()); 
-        String produceBaseEndPoint = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/producer/" + wsTopic;
-        URI produceUri = URI.create(produceBaseEndPoint);
+        URI produceUri = URI.create(getProduceBaseEndPoint(topic));
 
         WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
         ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
new file mode 100644
index 0000000..173fcfc
--- /dev/null
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.cli;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TestCmdProduce {
+
+    CmdProduce cmdProduce;
+
+    @BeforeMethod
+    public void setUp() {
+        cmdProduce = new CmdProduce();
+        cmdProduce.updateConfig(null, null, "ws://localhost:8080/");
+    }
+
+    @Test
+    public void testGetProduceBaseEndPoint() {
+        String topicNameV1 = "persistent://public/cluster/default/issue-11067";
+        Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV1),
+                "ws://localhost:8080/ws/producer/persistent/public/cluster/default/issue-11067");
+        String topicNameV2 = "persistent://public/default/issue-11067";
+        Assert.assertEquals(cmdProduce.getProduceBaseEndPoint(topicNameV2),
+                "ws://localhost:8080/ws/v2/producer/persistent/public/default/issue-11067");
+    }
+}
\ No newline at end of file

[pulsar] 01/05: Make the compaction phase one loop timeout configurable (#11206)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 16633386930021324387906a14bb81920bdae304
Author: lipenghui <pe...@apache.org>
AuthorDate: Sun Jul 4 08:48:43 2021 +0800

    Make the compaction phase one loop timeout configurable (#11206)
    
    
    (cherry picked from commit 96e4fd69e5376fa2924517b045d02f8229679b92)
---
 conf/broker.conf                                           |  4 ++++
 .../org/apache/pulsar/broker/ServiceConfiguration.java     |  7 +++++++
 .../org/apache/pulsar/compaction/TwoPhaseCompactor.java    |  9 +++++++--
 .../java/org/apache/pulsar/compaction/CompactorTest.java   | 14 ++++++++++++++
 4 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index e8c7877..775f9aa 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -450,6 +450,10 @@ brokerServiceCompactionMonitorIntervalInSeconds=60
 # Using a value of 0, is disabling compression check.
 brokerServiceCompactionThresholdInBytes=0
 
+# Timeout for the compaction phase one loop.
+# If the execution time of the compaction phase one loop exceeds this time, the compaction will not proceed.
+brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
+
 # Whether to enable the delayed delivery for messages.
 # If disabled, messages will be immediately delivered and there will
 # be no tracking overhead.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8bd05c2..ebefd59 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1882,6 +1882,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private long brokerServiceCompactionThresholdInBytes = 0;
 
     @FieldContext(
+            category = CATEGORY_SERVER,
+            doc = "Timeout for the compaction phase one loop, If the execution time of the compaction " +
+                    "phase one loop exceeds this time, the compaction will not proceed."
+    )
+    private long brokerServiceCompactionPhaseOneLoopTimeInSeconds = 30;
+
+    @FieldContext(
         category = CATEGORY_SCHEMA,
         doc = "Enforce schema validation on following cases:\n\n"
             + " - if a producer without a schema attempts to produce to a topic with schema, the producer will be\n"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 800182e..0f0f981 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -60,13 +60,14 @@ public class TwoPhaseCompactor extends Compactor {
     private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
     private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
-    public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10);
+    private final Duration phaseOneLoopReadTimeout;
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
                              PulsarClient pulsar,
                              BookKeeper bk,
                              ScheduledExecutorService scheduler) {
         super(conf, pulsar, bk, scheduler);
+        phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds());
     }
 
     @Override
@@ -116,7 +117,7 @@ public class TwoPhaseCompactor extends Compactor {
         }
         CompletableFuture<RawMessage> future = reader.readNextAsync();
         FutureUtil.addTimeoutHandling(future,
-                PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
+                phaseOneLoopReadTimeout, scheduler,
                 () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
 
         future.thenAcceptAsync(m -> {
@@ -399,4 +400,8 @@ public class TwoPhaseCompactor extends Compactor {
             this.latestForKey = latestForKey;
         }
     }
+
+    public long getPhaseOneLoopReadTimeoutInSeconds() {
+        return phaseOneLoopReadTimeout.getSeconds();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 57a2146..0d1a95c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -39,15 +39,19 @@ import java.util.concurrent.ScheduledExecutorService;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -220,6 +224,16 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         compactor.compact(topic).get();
     }
 
+    @Test
+    public void testPhaseOneLoopTimeConfiguration() {
+        ServiceConfiguration configuration = new ServiceConfiguration();
+        configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60);
+        TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, Mockito.mock(PulsarClientImpl.class),
+                Mockito.mock(BookKeeper.class), compactionScheduler);
+        Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60);
+
+    }
+
     public ByteBuf extractPayload(RawMessage m) throws Exception {
         ByteBuf payloadAndMetadata = m.getHeadersAndPayload();
         Commands.skipChecksumIfPresent(payloadAndMetadata);

[pulsar] 02/05: Remove unused listener to reduce creating executor pool. (#11215)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1b3582c28630f0603657b7395790f5f621dd3084
Author: Tboy <te...@apache.org>
AuthorDate: Sun Jul 4 16:22:50 2021 +0800

    Remove unused listener to reduce creating executor pool. (#11215)
    
    
    (cherry picked from commit c1c0e9085a4c4e57d83e9ab67d53163afe48f3d4)
---
 .../offload/filesystem/impl/FileSystemManagedLedgerOffloader.java      | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
index 922332b..ddf83aa 100644
--- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
+++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java
@@ -203,8 +203,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader {
                     semaphore.acquire();
                     countDownLatch = new CountDownLatch(1);
                     assignmentScheduler.chooseThread(ledgerId).submit(FileSystemWriter.create(ledgerEntriesOnce, dataWriter, semaphore,
-                            countDownLatch, haveOffloadEntryNumber, this)).addListener(() -> {
-                    }, Executors.newSingleThreadExecutor());
+                            countDownLatch, haveOffloadEntryNumber, this));
                     needToOffloadFirstEntryNumber = end + 1;
                 } while (needToOffloadFirstEntryNumber - 1 != readHandle.getLastAddConfirmed() && fileSystemWriteException == null);
                 countDownLatch.await();