You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/04 08:28:26 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch master updated: SLING-8557 - Fix error handling

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 31a285a  SLING-8557 - Fix error handling
31a285a is described below

commit 31a285a22ecd04007e554cd3961863021da2d0f1
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Jul 4 10:28:16 2019 +0200

    SLING-8557 - Fix error handling
---
 .../journal/kafka/KafkaJsonMessagePoller.java      | 37 +++++++++++-----------
 .../journal/kafka/KafkaMessagePoller.java          | 36 +++++++++++----------
 2 files changed, 39 insertions(+), 34 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 3597d7a..e437ceb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -69,29 +69,30 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
 
     public void run() {
         LOG.info("Start JSON poller for handler {}", handler);
-        try {
-            while(running) {
-                consume();
-            }
-        } catch (WakeupException e) {
-            if (running) {
-                LOG.error("Waked up while running {}", e.getMessage(), e);
-                throw e;
-            } else {
+        while(running) {
+            try {
+                consumer.poll(ofHours(1))
+                    .forEach(this::handleRecord);
+            } catch (WakeupException e) {
                 LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+                running = false;
+            } catch(Exception e) {
+                eventSender.send(e);
+                LOG.error("Exception during recieve: {}", e.getMessage(), e);
+                sleepAfterError();
+                // Continue as KafkaConsumer should handle the error transparently
             }
-        } catch(Throwable t) {
-            LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
-            throw t;
-        } finally {
-            consumer.close();
         }
+        consumer.close();
         LOG.info("Stop JSON poller for handler {}", handler);
     }
-
-    private void consume() {
-        consumer.poll(ofHours(1))
-                .forEach(this::handleRecord);
+    
+    private void sleepAfterError() {
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+        }
     }
 
     private void handleRecord(ConsumerRecord<String, String> record) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 3e20d75..52ae1a6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -77,27 +77,32 @@ public class KafkaMessagePoller implements Closeable {
 
     public void run() {
         LOG.info("Start poller for types {}", types);
-        try {
-            while(running) {
+        while(running) {
+            try {
                 consumer.poll(ofHours(1))
-                    .forEach(this::handleRecord);
-            }
-        } catch (WakeupException e) {
-            if (running) {
-                LOG.error("Waked up while running {}", e.getMessage(), e);
-                throw e;
-            } else {
-                LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+                .forEach(this::handleRecord);
+            } catch (WakeupException e) {
+                LOG.debug("Waked up {}", e.getMessage(), e);
+                this.running = false;
+            } catch(Exception e) {
+                eventSender.send(e);
+                LOG.error("Exception while receiving from kafka: {}", e.getMessage(), e);
+                sleepAfterError();
+                // Continue as KafkaConsumer should handle the error transparently
             }
-        } catch(Throwable t) {
-            LOG.error(format("Catch Throwable %s closing consumer", t.getMessage()), t);
-            throw t;
-        } finally {
-            consumer.close();
         }
+        consumer.close();
         LOG.info("Stop poller for types {}", types);
     }
 
+    private void sleepAfterError() {
+        try {
+            Thread.sleep(10000);
+        } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     private void handleRecord(ConsumerRecord<String, byte[]> record) {
         getHandler(record)
             .ifPresent(handler->handleRecord(handler, record));
@@ -125,7 +130,6 @@ public class KafkaMessagePoller implements Closeable {
             ByteString payload = ByteString.copyFrom(record.value());
             handler.handle(info, payload);
         } catch (Exception e) {
-            eventSender.send(e);
             String msg = format("Error consuming message for types %s", types);
             LOG.warn(msg);
         }