You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/04 15:10:14 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch master updated: SLING-8557 - Handle exception

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 529bbaf  SLING-8557 - Handle exception
529bbaf is described below

commit 529bbafbb2ff80d52a3c7987b3c67775b30b4260
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Thu Jul 4 17:10:04 2019 +0200

    SLING-8557 - Handle exception
---
 .../sling/distribution/journal/kafka/ProtobufRecordHandler.java    | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
index 9a331f2..4435bb4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.messages.Types;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +71,11 @@ public class ProtobufRecordHandler implements Consumer<ConsumerRecord<String, by
     private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
         MessageInfo info = new KafkaMessageInfo(record);
         ByteString payload = ByteString.copyFrom(record.value());
-        handler.handle(info, payload);
+        try {
+            handler.handle(info, payload);
+        } catch (Exception e) {
+            throw new MessagingException(e.getMessage(), e);
+        }
     }
 
     private String getHeaderValue(Headers headers, String key) {