You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/26 12:39:39 UTC

(pulsar) branch branch-3.2 updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new b764afa2bb3 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
b764afa2bb3 is described below

commit b764afa2bb367e20849c1297b282beb5d746b474
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 26 15:16:47 2024 +0300

    [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
    
    (cherry picked from commit 69839c72f1375d141b56734bc5e041c13e366c57)
---
 .../org/apache/pulsar/metadata/impl/ZKMetadataStore.java  | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 079ae3e2ae5..2e88cb33324 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -192,7 +192,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
                     Code code = Code.get(rc);
                     if (code == Code.CONNECTIONLOSS) {
                         // There is the chance that we caused a connection reset by sending or requesting a batch
-                        // that passed the max ZK limit. Retry with the individual operations
+                        // that passed the max ZK limit.
+
+                        // Build the log warning message
+                        // summarize the operations by type
+                        String countsByType = ops.stream().collect(
+                                        Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
+                                .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries")
+                                .collect(Collectors.joining(", "));
+                        Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size));
+                        log.warn("Connection loss while executing batch operation of {} "
+                                + "of total data size of {}. "
+                                + "Retrying individual operations one-by-one.", countsByType, totalSize);
+
+                        // Retry with the individual operations
                         executor.schedule(() -> {
                             ops.forEach(o -> batchOperation(Collections.singletonList(o)));
                         }, 100, TimeUnit.MILLISECONDS);