You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/03/01 00:06:14 UTC

[pulsar] 01/01: Retry creation of assignment topic a few times before giving up

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch retry_creation
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9f62a190b577243c1044e718de7f99ea3792693e
Author: Sanjeev Kulkarni <sa...@streaml.io>
AuthorDate: Thu Feb 28 16:05:33 2019 -0800

    Retry creation of assignment topic a few times before giving up
---
 .../pulsar/functions/worker/SchedulerManager.java  | 23 +++++-----------------
 1 file changed, 5 insertions(+), 18 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 2a93494..f50acc3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -105,25 +105,12 @@ public class SchedulerManager implements AutoCloseable {
                     .compressionType(CompressionType.LZ4)
                     .sendTimeout(0, TimeUnit.MILLISECONDS)
                     .createAsync().get(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.error("Exception while at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
+            }
+            try {
+                Thread.sleep(10000);
             } catch (InterruptedException e) {
-                log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
-            } catch (ExecutionException e) {
-                log.error("Encountered exceptions at creating producer for topic {}",
-                    config.getFunctionAssignmentTopic(), e);
-                throw new RuntimeException(e);
-            } catch (TimeoutException e) {
-                try {
-                    log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...",
-                        stopwatch.elapsed(TimeUnit.SECONDS));
-                    TimeUnit.SECONDS.sleep(10);
-                } catch (InterruptedException e1) {
-                    log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
-                    Thread.currentThread().interrupt();
-                    throw new RuntimeException(e);
-                }
-                continue;
             }
         }
         throw new RuntimeException("Can't create a producer on assignment topic "