You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/04/26 12:16:54 UTC
(pulsar) branch master updated: [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 69839c72f13 [improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
69839c72f13 is described below
commit 69839c72f1375d141b56734bc5e041c13e366c57
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Apr 26 15:16:47 2024 +0300
[improve][meta] Log a warning when ZK batch fails with connectionloss (#22566)
---
.../org/apache/pulsar/metadata/impl/ZKMetadataStore.java | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
index 079ae3e2ae5..2e88cb33324 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -192,7 +192,20 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
Code code = Code.get(rc);
if (code == Code.CONNECTIONLOSS) {
// There is the chance that we caused a connection reset by sending or requesting a batch
- // that passed the max ZK limit. Retry with the individual operations
+ // that passed the max ZK limit.
+
+ // Build the log warning message
+ // summarize the operations by type
+ String countsByType = ops.stream().collect(
+ Collectors.groupingBy(MetadataOp::getType, Collectors.summingInt(op -> 1)))
+ .entrySet().stream().map(e -> e.getValue() + " " + e.getKey().name() + " entries")
+ .collect(Collectors.joining(", "));
+ Long totalSize = ops.stream().collect(Collectors.summingLong(MetadataOp::size));
+ log.warn("Connection loss while executing batch operation of {} "
+ + "of total data size of {}. "
+ + "Retrying individual operations one-by-one.", countsByType, totalSize);
+
+ // Retry with the individual operations
executor.schedule(() -> {
ops.forEach(o -> batchOperation(Collections.singletonList(o)));
}, 100, TimeUnit.MILLISECONDS);