You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/06/06 08:16:54 UTC

[pulsar] branch branch-2.10 updated (9dd4057d5bc -> 77292ad49fc)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 9dd4057d5bc PIP-105 add support for updating the Subscription properties (#15751)
     new a85fdcec5cb [Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752)
     new 774e22c2f47 [Broker] Add timeout to closing CoordinationServiceImpl (#15777)
     new d847e1cde5b [Cli tools] Disable Pulsar client memory limit by default (#15748)
     new 3f3d4525c49 [Tests] Fix OutOfMemoryError and NoClassDefFoundErrors in tests (#15911)
     new ec0215c1cf6 [ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918)
     new 77292ad49fc Enable TCP/IP keepalive for all ZK client connections in all components (#15908)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bin/pulsar                                         |  4 +-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 86 +++++++++++++---------
 .../pulsar/broker/namespace/NamespaceService.java  |  6 +-
 .../apache/pulsar/compaction/CompactorTool.java    |  5 +-
 .../apache/pulsar/client/cli/PulsarClientTool.java |  4 +-
 .../client/impl/BatchMessageContainerImplTest.java | 33 +++++++--
 .../pulsar/functions/instance/InstanceUtils.java   |  4 +-
 .../runtime/thread/ThreadRuntimeFactoryTest.java   |  5 +-
 .../pulsar/functions/worker/WorkerUtils.java       |  5 +-
 .../coordination/impl/CoordinationServiceImpl.java |  7 +-
 .../pulsar/testclient/LoadSimulationClient.java    |  2 +
 .../pulsar/testclient/PerformanceConsumer.java     |  2 +
 .../pulsar/testclient/PerformanceProducer.java     |  2 +
 .../pulsar/testclient/PerformanceReader.java       |  2 +
 .../pulsar/testclient/PerformanceTransaction.java  |  5 +-
 .../apache/pulsar/websocket/WebSocketService.java  |  2 +
 16 files changed, 120 insertions(+), 54 deletions(-)


[pulsar] 06/06: Enable TCP/IP keepalive for all ZK client connections in all components (#15908)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 77292ad49fcffab9ea0f3c8b70fdb6ddcf4f0f04
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Jun 4 19:27:02 2022 +0300

    Enable TCP/IP keepalive for all ZK client connections in all components (#15908)
    
    (cherry picked from commit ede3d19399f5d58e5218a6e7bf1f86a2c44eadc9)
---
 bin/pulsar | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/bin/pulsar b/bin/pulsar
index 43e137af531..f1067c07a3f 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -270,6 +270,8 @@ OPTS="$OPTS -Dlog4j.configurationFile=`basename $PULSAR_LOG_CONF`"
 # rarely needed when trying to list many z-nodes under a
 # directory)
 OPTS="$OPTS -Djute.maxbuffer=10485760 -Djava.net.preferIPv4Stack=true"
+# Enable TCP keepalive for all Zookeeper client connections
+OPTS="$OPTS -Dzookeeper.clientTcpKeepAlive=true"
 
 # Allow Netty to use reflection access
 OPTS="$OPTS -Dio.netty.tryReflectionSetAccessible=true"
@@ -311,7 +313,7 @@ OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR
 OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
 OPTS="$OPTS -Dpulsar.functions.log.conf=${FUNCTIONS_LOG_CONF}"
 
-ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true -Dzookeeper.tcpKeepAlive=true -Dzookeeper.clientTcpKeepAlive=true"
+ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=* -Dzookeeper.snapshot.trust.empty=true -Dzookeeper.tcpKeepAlive=true"
 
 LOG4J2_SHUTDOWN_HOOK_DISABLED="-Dlog4j.shutdownHookEnabled=false"
 


[pulsar] 04/06: [Tests] Fix OutOfMemoryError and NoClassDefFoundErrors in tests (#15911)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3f3d4525c495e3da3b98b6929613761f30871a52
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jun 3 03:52:40 2022 +0300

    [Tests] Fix OutOfMemoryError and NoClassDefFoundErrors in tests (#15911)
    
    Fixes #15689
    Fixes #15793
    Fixes #15897
    
    ### Motivation
    
    `PulsarByteBufAllocator.DEFAULT` instance was replaced in BatchMessageContainerImplTest, but never restored. This caused strange issues in many unit tests. OutOfMemoryError and NoClassDefFoundErrors  were most likely caused by the BatchMessageContainerImplTest.
    
    ### Modifications
    
    Fix the problems in BatchMessageContainerImplTest and reset the state after the test completes.
    
    ### Additional context
    
    The flaky test was introduced by #12170 which has been cherry-picked to branch-2.8, branch-2.9 and included also in branch-2.10 .
    
    (cherry picked from commit ef1e0aa73592b4a81bd89a43628096a839b0bc26)
---
 .../client/impl/BatchMessageContainerImplTest.java | 33 ++++++++++++++++------
 1 file changed, 25 insertions(+), 8 deletions(-)

diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
index 54af93c87e3..8fc018b3199 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BatchMessageContainerImplTest.java
@@ -18,31 +18,38 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
 import org.testng.annotations.Test;
 
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.when;
-
 public class BatchMessageContainerImplTest {
 
     @Test
     public void recoveryAfterOom() throws Exception {
+        AtomicBoolean called = new AtomicBoolean();
         try (MockedConstruction<ByteBufAllocatorImpl> mocked = Mockito.mockConstruction(ByteBufAllocatorImpl.class,
                 (mockAllocator, context) -> {
+                    called.set(true);
                     doThrow(new OutOfMemoryError("test")).when(mockAllocator).buffer(anyInt(), anyInt());
                 })) {
-
+            if (PulsarByteBufAllocator.DEFAULT != null && !called.get()) {
+                replaceByteBufAllocator();
+            }
             final ProducerImpl producer = Mockito.mock(ProducerImpl.class);
             final ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
             producerConfigurationData.setCompressionType(CompressionType.NONE);
@@ -64,7 +71,17 @@ public class BatchMessageContainerImplTest {
             final MessageImpl<byte[]> message2 = MessageImpl.create(messageMetadata2, payload2, Schema.BYTES, null);
             // after oom, our add can self-healing, won't throw exception
             batchMessageContainer.add(message2, null);
+        } finally {
+            replaceByteBufAllocator();
         }
+
+    }
+
+    private void replaceByteBufAllocator() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
+        Method createByteBufAllocatorMethod = PulsarByteBufAllocator.class.getDeclaredMethod("createByteBufAllocator");
+        createByteBufAllocatorMethod.setAccessible(true);
+        Whitebox.setInternalState(PulsarByteBufAllocator.class, "DEFAULT",
+                createByteBufAllocatorMethod.invoke(null));
     }
 
 }


[pulsar] 01/06: [Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a85fdcec5cb3ed311f50206ac650ae0cc0f68b94
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri May 27 10:52:13 2022 +0300

    [Broker, Functions, Websocket] Disable memory limit controller in internal Pulsar clients (#15752)
    
    (cherry picked from commit ec52320f16b1f7e04f4cef16dc3082779dfd4d50)
---
 .../java/org/apache/pulsar/broker/namespace/NamespaceService.java   | 6 ++++--
 .../src/main/java/org/apache/pulsar/compaction/CompactorTool.java   | 5 +++--
 .../java/org/apache/pulsar/functions/instance/InstanceUtils.java    | 4 +++-
 .../pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java   | 5 ++++-
 .../main/java/org/apache/pulsar/functions/worker/WorkerUtils.java   | 5 ++++-
 .../src/main/java/org/apache/pulsar/websocket/WebSocketService.java | 2 ++
 6 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 58abf48d9b0..9cc427aae67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@@ -1253,8 +1254,9 @@ public class NamespaceService implements AutoCloseable {
         return namespaceClients.computeIfAbsent(cluster, key -> {
             try {
                 ClientBuilder clientBuilder = PulsarClient.builder()
-                    .enableTcpNoDelay(false)
-                    .statsInterval(0, TimeUnit.SECONDS);
+                        .memoryLimit(0, SizeUnit.BYTES)
+                        .enableTcpNoDelay(false)
+                        .statsInterval(0, TimeUnit.SECONDS);
 
                 // Apply all arbitrary configuration. This must be called before setting any fields annotated as
                 // @Secret on the ClientConfigurationData object because of the way they are serialized.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index 691217d9d77..59dd4c20aab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.CmdGenerateDocs;
@@ -104,8 +105,8 @@ public class CompactorTool {
             );
         }
 
-        ClientBuilder clientBuilder = PulsarClient.builder();
-
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .memoryLimit(0, SizeUnit.BYTES);
         // Apply all arbitrary configuration. This must be called before setting any fields annotated as
         // @Secret on the ClientConfigurationData object because of the way they are serialized.
         // See https://github.com/apache/pulsar/issues/8509 for more information.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index d52c7dbefdb..d19e79d4026 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -161,7 +161,9 @@ public class InstanceUtils {
                                                           Optional<Long> memoryLimit) throws PulsarClientException {
         ClientBuilder clientBuilder = null;
         if (isNotBlank(pulsarServiceUrl)) {
-            clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            clientBuilder = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
+                    .serviceUrl(pulsarServiceUrl);
             if (authConfig != null) {
                 if (isNotBlank(authConfig.getClientAuthenticationPlugin())
                         && isNotBlank(authConfig.getClientAuthenticationParameters())) {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
index c86e1d5f2d6..9462463c05a 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryTest.java
@@ -36,6 +36,8 @@ import org.testng.annotations.Test;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mockStatic;
@@ -79,7 +81,7 @@ public class ThreadRuntimeFactoryTest {
 
         ClientBuilder clientBuilder = testMemoryLimit(null, null);
 
-        Mockito.verify(clientBuilder, Mockito.times(0)).memoryLimit(Mockito.anyLong(), Mockito.any());
+        Mockito.verify(clientBuilder, Mockito.times(1)).memoryLimit(Mockito.eq(0L), Mockito.eq(SizeUnit.BYTES));
     }
 
     @Test
@@ -110,6 +112,7 @@ public class ThreadRuntimeFactoryTest {
             ClientBuilder clientBuilder = Mockito.mock(ClientBuilder.class);
             mockedPulsarClient.when(() -> PulsarClient.builder()).thenAnswer(i -> clientBuilder);
             doReturn(clientBuilder).when(clientBuilder).serviceUrl(anyString());
+            doReturn(clientBuilder).when(clientBuilder).memoryLimit(anyLong(), any());
 
             ThreadRuntimeFactoryConfig threadRuntimeFactoryConfig = new ThreadRuntimeFactoryConfig();
             threadRuntimeFactoryConfig.setThreadGroupName("foo");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index d50d57ee27c..828b4e16516 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.functions.WorkerInfo;
@@ -295,7 +296,9 @@ public final class WorkerUtils {
                                                WorkerConfig workerConfig) {
 
         try {
-            ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            ClientBuilder clientBuilder = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
+                    .serviceUrl(pulsarServiceUrl);
 
             if (workerConfig != null) {
                 // Apply all arbitrary configuration. This must be called before setting any fields annotated as
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index dbac405a0e0..a57c6c491e7 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.internal.PropertiesUtils;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -175,6 +176,7 @@ public class WebSocketService implements Closeable {
 
     private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
         ClientBuilder clientBuilder = PulsarClient.builder() //
+                .memoryLimit(0, SizeUnit.BYTES)
                 .statsInterval(0, TimeUnit.SECONDS) //
                 .enableTls(config.isTlsEnabled()) //
                 .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) //


[pulsar] 05/06: [ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ec0215c1cf613fc71899d6156920b0f7c1e00528
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sat Jun 4 10:36:00 2022 +0300

    [ML] Fix race condition in getManagedLedgerInternalStats when includeLedgerMetadata=true (#15918)
    
    - getLedgerMetadata is an asynchronous operation and the final result shouldn't complete before the
      metadata for all ledgers has been retrieved
    
    (cherry picked from commit fe19b3ca949009c952270a64ba94dbb329ea572f)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 86 +++++++++++++---------
 1 file changed, 51 insertions(+), 35 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1c7297d880a..87f97ca8329 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -37,6 +37,7 @@ import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.time.Clock;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -4029,45 +4030,60 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         stats.lastConfirmedEntry = this.getLastConfirmedEntry().toString();
         stats.state = this.getState().toString();
 
-        stats.ledgers = Lists.newArrayList();
-        this.getLedgersInfo().forEach((id, li) -> {
-            ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
-            info.ledgerId = li.getLedgerId();
-            info.entries = li.getEntries();
-            info.size = li.getSize();
-            info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
-            stats.ledgers.add(info);
-            if (includeLedgerMetadata) {
-                this.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
-                    if (ex == null) {
-                        info.metadata = lMetadata;
-                    }
+        stats.cursors = Maps.newTreeMap();
+        this.getCursors().forEach(c -> {
+            ManagedCursorImpl cursor = (ManagedCursorImpl) c;
+            PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
+            cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
+            cs.readPosition = cursor.getReadPosition().toString();
+            cs.waitingReadOp = cursor.hasPendingReadRequest();
+            cs.pendingReadOps = cursor.getPendingReadOpsCount();
+            cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
+            cs.cursorLedger = cursor.getCursorLedger();
+            cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
+            cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
+            cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
+            cs.state = cursor.getState();
+            cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
+            cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
+            cs.properties = cursor.getProperties();
+            stats.cursors.put(cursor.getName(), cs);
+        });
+
+        // make a snapshot of the ledgers infos since we are iterating it twice when metadata is included
+        // a list is sufficient since there's no need to lookup by the ledger id
+        List<LedgerInfo> ledgersInfos = new ArrayList<>(this.getLedgersInfo().values());
+
+        // add asynchronous metadata retrieval operations to a hashmap
+        Map<Long, CompletableFuture<String>> ledgerMetadataFutures = new HashMap();
+        if (includeLedgerMetadata) {
+            ledgersInfos.forEach(li -> {
+                long ledgerId = li.getLedgerId();
+                ledgerMetadataFutures.put(ledgerId, this.getLedgerMetadata(ledgerId).exceptionally(throwable -> {
+                    log.warn("Getting metadata for ledger {} failed.", ledgerId, throwable);
                     return null;
-                });
-            }
+                }));
+            });
+        }
 
-            stats.cursors = Maps.newTreeMap();
-            this.getCursors().forEach(c -> {
-                ManagedCursorImpl cursor = (ManagedCursorImpl) c;
-                PersistentTopicInternalStats.CursorStats cs = new PersistentTopicInternalStats.CursorStats();
-                cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
-                cs.readPosition = cursor.getReadPosition().toString();
-                cs.waitingReadOp = cursor.hasPendingReadRequest();
-                cs.pendingReadOps = cursor.getPendingReadOpsCount();
-                cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
-                cs.cursorLedger = cursor.getCursorLedger();
-                cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
-                cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
-                cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
-                cs.state = cursor.getState();
-                cs.numberOfEntriesSinceFirstNotAckedMessage =
-                        cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
-                cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
-                cs.properties = cursor.getProperties();
-                stats.cursors.put(cursor.getName(), cs);
+        // wait until metadata has been retrieved
+        FutureUtil.waitForAll(ledgerMetadataFutures.values()).thenAccept(__ -> {
+            stats.ledgers = Lists.newArrayList();
+            ledgersInfos.forEach(li -> {
+                ManagedLedgerInternalStats.LedgerInfo info = new ManagedLedgerInternalStats.LedgerInfo();
+                info.ledgerId = li.getLedgerId();
+                info.entries = li.getEntries();
+                info.size = li.getSize();
+                info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
+                if (includeLedgerMetadata) {
+                    // lookup metadata from the hashmap which contains completed async operations
+                    info.metadata = ledgerMetadataFutures.get(li.getLedgerId()).getNow(null);
+                }
+                stats.ledgers.add(info);
             });
+            statFuture.complete(stats);
         });
-        statFuture.complete(stats);
+
         return statFuture;
     }
 


[pulsar] 03/06: [Cli tools] Disable Pulsar client memory limit by default (#15748)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d847e1cde5b5aee85d6a3ae9fd5b2bb3fa6fd874
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Jun 2 16:53:07 2022 +0300

    [Cli tools] Disable Pulsar client memory limit by default (#15748)
    
    - There's a regression with the tools since the memory limit cannot be adjusted
      - It's better to default to the previous setting of disabling memory limits
        so that the performance profile doesn't change because of the memory limit.
    
    (cherry picked from commit a6a7516aaf5a27ecfec083f0eb868e4ebbcbbef8)
---
 .../src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java | 4 +++-
 .../main/java/org/apache/pulsar/testclient/LoadSimulationClient.java | 2 ++
 .../main/java/org/apache/pulsar/testclient/PerformanceConsumer.java  | 2 ++
 .../main/java/org/apache/pulsar/testclient/PerformanceProducer.java  | 2 ++
 .../main/java/org/apache/pulsar/testclient/PerformanceReader.java    | 2 ++
 .../java/org/apache/pulsar/testclient/PerformanceTransaction.java    | 5 ++++-
 6 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index fe716941dbc..617ed6b5a7c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.api.SizeUnit;
 
 @Parameters(commandDescription = "Produce or consume messages on a specified topic")
 public class PulsarClientTool {
@@ -119,7 +120,8 @@ public class PulsarClientTool {
     }
 
     private void updateConfig() throws UnsupportedAuthenticationException {
-        ClientBuilder clientBuilder = PulsarClient.builder();
+        ClientBuilder clientBuilder = PulsarClient.builder()
+                .memoryLimit(0, SizeUnit.BYTES);
         Authentication authentication = null;
         if (isNotBlank(this.authPluginClassName)) {
             authentication = AuthenticationFactory.create(authPluginClassName, authParams);
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
index 34def55b9db..04f1be4e382 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationClient.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -317,6 +318,7 @@ public class LoadSimulationClient {
                     .serviceHttpUrl(arguments.serviceURL)
                     .build();
         client = PulsarClient.builder()
+                    .memoryLimit(0, SizeUnit.BYTES)
                     .serviceUrl(arguments.serviceURL)
                     .connectionsPerBroker(4)
                     .ioThreads(Runtime.getRuntime().availableProcessors())
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 9d1920ac4f6..029f4ee8de5 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -347,6 +348,7 @@ public class PerformanceConsumer {
         long testEndTime = startTime + (long) (arguments.testTime * 1e9);
 
         ClientBuilder clientBuilder = PulsarClient.builder() //
+                .memoryLimit(0, SizeUnit.BYTES)
                 .enableTransaction(arguments.isEnableTransaction)
                 .serviceUrl(arguments.serviceURL) //
                 .connectionsPerBroker(arguments.maxConnections) //
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index f18f4a84e13..554d57972c9 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -68,6 +68,7 @@ import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -572,6 +573,7 @@ public class PerformanceProducer {
             List<Future<Producer<byte[]>>> futures = new ArrayList<>();
 
             ClientBuilder clientBuilder = PulsarClient.builder() //
+                    .memoryLimit(0, SizeUnit.BYTES)
                     .enableTransaction(arguments.isEnableTransaction)//
                     .serviceUrl(arguments.serviceURL) //
                     .connectionsPerBroker(arguments.maxConnections) //
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index dee3a6c57c1..31559536560 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -257,6 +258,7 @@ public class PerformanceReader {
         };
 
         ClientBuilder clientBuilder = PulsarClient.builder() //
+                .memoryLimit(0, SizeUnit.BYTES)
                 .serviceUrl(arguments.serviceURL) //
                 .connectionsPerBroker(arguments.maxConnections) //
                 .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) //
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 31e566919ba..2410bd615af 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
@@ -295,7 +296,9 @@ public class PerformanceTransaction {
         }
 
         ClientBuilder clientBuilder =
-                PulsarClient.builder().enableTransaction(!arguments.isDisableTransaction)
+                PulsarClient.builder()
+                        .memoryLimit(0, SizeUnit.BYTES)
+                        .enableTransaction(!arguments.isDisableTransaction)
                         .serviceUrl(arguments.serviceURL)
                         .connectionsPerBroker(arguments.maxConnections)
                         .statsInterval(0, TimeUnit.SECONDS)


[pulsar] 02/06: [Broker] Add timeout to closing CoordinationServiceImpl (#15777)

Posted by lh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 774e22c2f4726709f2cb0246054baae5a8218d59
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue May 31 17:12:54 2022 +0300

    [Broker] Add timeout to closing CoordinationServiceImpl (#15777)
    
    Fixes #15774
    
    Also close the executor that wasn't closed
    
    (cherry picked from commit 1266f913678804d4cb7f2142458e87d6155f8bd4)
---
 .../pulsar/metadata/coordination/impl/CoordinationServiceImpl.java | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
index 4bda85c64fb..2b7e38b6c44 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/CoordinationServiceImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.metadata.coordination.impl;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -29,6 +30,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.MetadataSerde;
@@ -43,6 +45,7 @@ import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 @SuppressWarnings("unchecked")
 public class CoordinationServiceImpl implements CoordinationService {
 
+    private static final Duration CLOSE_TIMEOUT = Duration.ofSeconds(10);
     private final MetadataStoreExtended store;
 
     private final Map<Object, LockManager<?>> lockManagers = new ConcurrentHashMap<>();
@@ -69,9 +72,11 @@ public class CoordinationServiceImpl implements CoordinationService {
                 futures.add(lm.asyncClose());
             }
 
-            FutureUtils.collect(futures).join();
+            FutureUtils.collect(futures).get(CLOSE_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
         } catch (CompletionException ce) {
             throw MetadataStoreException.unwrap(ce);
+        } finally {
+            executor.shutdownNow();
         }
     }