You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 06:27:30 UTC

[pulsar] 07/15: add additional error handling in auto partition update task MultiTopicsConsumerImpl (#12620)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 361c4c1d64391576ab5f131b1ce8a60ca68712dc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Nov 4 08:24:45 2021 -0700

    add additional error handling in auto partition update task MultiTopicsConsumerImpl (#12620)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
    (cherry picked from commit 11cfbe4f5cf66ac93ec0fb6dd2ea160a0a8bf6cf)
---
 .../client/impl/MultiTopicsConsumerImpl.java       | 33 +++++++++++++---------
 .../client/impl/PartitionedProducerImpl.java       | 30 +++++++++++---------
 2 files changed, 37 insertions(+), 26 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 0de4bcc..faa2d65 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1326,28 +1326,35 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     topicName, oldPartitionNumber, currentPartitionNumber);
                 return FutureUtil.failedFuture(new NotSupportedException("not support shrink topic partitions"));
             }
+        }).exceptionally(throwable -> {
+            log.warn("[{}] Failed to get partitions for topic to determine if new partitions are added", throwable);
+            return null;
         });
     }
 
     private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled() || getState() != State.Ready) {
-                return;
-            }
+            try {
+                if (timeout.isCancelled() || getState() != State.Ready) {
+                    return;
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
-            }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
+                }
 
-            // if last auto update not completed yet, do nothing.
-            if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
-                partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
+                // if last auto update not completed yet, do nothing.
+                if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
+                    partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
+                }
+            } catch (Throwable th) {
+                log.warn("Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled.", th);
+            } finally {
+                // schedule the next re-check task
+                partitionsAutoUpdateTimeout = client.timer()
+                        .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
             }
-
-            // schedule the next re-check task
-            partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     };
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 61b72fe..9ad0f14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -356,22 +356,26 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
     private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
         @Override
         public void run(Timeout timeout) throws Exception {
-            if (timeout.isCancelled() || getState() != State.Ready) {
-                return;
-            }
+            try {
+                if (timeout.isCancelled() || getState() != State.Ready) {
+                    return;
+                }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
-            }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
+                }
 
-            // if last auto update not completed yet, do nothing.
-            if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
-                partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
+                // if last auto update not completed yet, do nothing.
+                if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
+                    partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
+                }
+            } catch (Throwable th) {
+                log.warn("Encountered error in partition auto update timer task for partition producer. Another task will be scheduled.", th);
+            } finally {
+                // schedule the next re-check task
+                partitionsAutoUpdateTimeout = client.timer()
+                        .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
             }
-
-            // schedule the next re-check task
-            partitionsAutoUpdateTimeout = client.timer()
-                .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
         }
     };