You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/01 00:06:14 UTC
[pulsar] 01/01: Retry creation of assignment topic a few times
before giving up
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch retry_creation
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9f62a190b577243c1044e718de7f99ea3792693e
Author: Sanjeev Kulkarni <sa...@streaml.io>
AuthorDate: Thu Feb 28 16:05:33 2019 -0800
Retry creation of assignment topic a few times before giving up
---
.../pulsar/functions/worker/SchedulerManager.java | 23 +++++-----------------
1 file changed, 5 insertions(+), 18 deletions(-)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 2a93494..f50acc3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -105,25 +105,12 @@ public class SchedulerManager implements AutoCloseable {
.compressionType(CompressionType.LZ4)
.sendTimeout(0, TimeUnit.MILLISECONDS)
.createAsync().get(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.error("Exception while at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
+ }
+ try {
+ Thread.sleep(10000);
} catch (InterruptedException e) {
- log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- log.error("Encountered exceptions at creating producer for topic {}",
- config.getFunctionAssignmentTopic(), e);
- throw new RuntimeException(e);
- } catch (TimeoutException e) {
- try {
- log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...",
- stopwatch.elapsed(TimeUnit.SECONDS));
- TimeUnit.SECONDS.sleep(10);
- } catch (InterruptedException e1) {
- log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- continue;
}
}
throw new RuntimeException("Can't create a producer on assignment topic "