You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/18 06:27:30 UTC
[pulsar] 07/15: add additional error handling in auto partition update task MultiTopicsConsumerImpl (#12620)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 361c4c1d64391576ab5f131b1ce8a60ca68712dc
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Nov 4 08:24:45 2021 -0700
add additional error handling in auto partition update task MultiTopicsConsumerImpl (#12620)
Co-authored-by: Jerry Peng <je...@splunk.com>
(cherry picked from commit 11cfbe4f5cf66ac93ec0fb6dd2ea160a0a8bf6cf)
---
.../client/impl/MultiTopicsConsumerImpl.java | 33 +++++++++++++---------
.../client/impl/PartitionedProducerImpl.java | 30 +++++++++++---------
2 files changed, 37 insertions(+), 26 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 0de4bcc..faa2d65 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1326,28 +1326,35 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
topicName, oldPartitionNumber, currentPartitionNumber);
return FutureUtil.failedFuture(new NotSupportedException("not support shrink topic partitions"));
}
+ }).exceptionally(throwable -> {
+ log.warn("[{}] Failed to get partitions for topic to determine if new partitions are added", throwable);
+ return null;
});
}
private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled() || getState() != State.Ready) {
- return;
- }
+ try {
+ if (timeout.isCancelled() || getState() != State.Ready) {
+ return;
+ }
- if (log.isDebugEnabled()) {
- log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
+ }
- // if last auto update not completed yet, do nothing.
- if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
- partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
+ // if last auto update not completed yet, do nothing.
+ if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
+ partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(partitionedTopics.keySet());
+ }
+ } catch (Throwable th) {
+ log.warn("Encountered error in partition auto update timer task for multi-topic consumer. Another task will be scheduled.", th);
+ } finally {
+ // schedule the next re-check task
+ partitionsAutoUpdateTimeout = client.timer()
+ .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
-
- // schedule the next re-check task
- partitionsAutoUpdateTimeout = client.timer()
- .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
};
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 61b72fe..9ad0f14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -356,22 +356,26 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled() || getState() != State.Ready) {
- return;
- }
+ try {
+ if (timeout.isCancelled() || getState() != State.Ready) {
+ return;
+ }
- if (log.isDebugEnabled()) {
- log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
- }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", topic);
+ }
- // if last auto update not completed yet, do nothing.
- if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
- partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
+ // if last auto update not completed yet, do nothing.
+ if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
+ partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(ImmutableList.of(topic));
+ }
+ } catch (Throwable th) {
+ log.warn("Encountered error in partition auto update timer task for partition producer. Another task will be scheduled.", th);
+ } finally {
+ // schedule the next re-check task
+ partitionsAutoUpdateTimeout = client.timer()
+ .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
-
- // schedule the next re-check task
- partitionsAutoUpdateTimeout = client.timer()
- .newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
};