You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/04 08:28:26 UTC
[sling-org-apache-sling-distribution-journal-kafka] branch master
updated: SLING-8557 - Fix error handling
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 31a285a SLING-8557 - Fix error handling
31a285a is described below
commit 31a285a22ecd04007e554cd3961863021da2d0f1
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Jul 4 10:28:16 2019 +0200
SLING-8557 - Fix error handling
---
.../journal/kafka/KafkaJsonMessagePoller.java | 37 +++++++++++-----------
.../journal/kafka/KafkaMessagePoller.java | 36 +++++++++++----------
2 files changed, 39 insertions(+), 34 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 3597d7a..e437ceb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -69,29 +69,30 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
public void run() {
LOG.info("Start JSON poller for handler {}", handler);
- try {
- while(running) {
- consume();
- }
- } catch (WakeupException e) {
- if (running) {
- LOG.error("Waked up while running {}", e.getMessage(), e);
- throw e;
- } else {
+ while(running) {
+ try {
+ consumer.poll(ofHours(1))
+ .forEach(this::handleRecord);
+ } catch (WakeupException e) {
LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+ running = false;
+ } catch(Exception e) {
+ eventSender.send(e);
+ LOG.error("Exception during recieve: {}", e.getMessage(), e);
+ sleepAfterError();
+ // Continue as KafkaConsumer should handle the error transparently
}
- } catch(Throwable t) {
- LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
- throw t;
- } finally {
- consumer.close();
}
+ consumer.close();
LOG.info("Stop JSON poller for handler {}", handler);
}
-
- private void consume() {
- consumer.poll(ofHours(1))
- .forEach(this::handleRecord);
+
+ private void sleepAfterError() {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
}
private void handleRecord(ConsumerRecord<String, String> record) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 3e20d75..52ae1a6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -77,27 +77,32 @@ public class KafkaMessagePoller implements Closeable {
public void run() {
LOG.info("Start poller for types {}", types);
- try {
- while(running) {
+ while(running) {
+ try {
consumer.poll(ofHours(1))
- .forEach(this::handleRecord);
- }
- } catch (WakeupException e) {
- if (running) {
- LOG.error("Waked up while running {}", e.getMessage(), e);
- throw e;
- } else {
- LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+ .forEach(this::handleRecord);
+ } catch (WakeupException e) {
+ LOG.debug("Waked up {}", e.getMessage(), e);
+ this.running = false;
+ } catch(Exception e) {
+ eventSender.send(e);
+ LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
+ sleepAfterError();
+ // Continue as KafkaConsumer should handle the error transparently
}
- } catch(Throwable t) {
- LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
- throw t;
- } finally {
- consumer.close();
}
+ consumer.close();
LOG.info("Stop poller for types {}", types);
}
+ private void sleepAfterError() {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void handleRecord(ConsumerRecord<String, byte[]> record) {
getHandler(record)
.ifPresent(handler->handleRecord(handler, record));
@@ -125,7 +130,6 @@ public class KafkaMessagePoller implements Closeable {
ByteString payload = ByteString.copyFrom(record.value());
handler.handle(info, payload);
} catch (Exception e) {
- eventSender.send(e);
String msg = format("Error consuming message for types %s", types);
LOG.warn(msg);
}