You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/02 12:33:03 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch master updated: SLING-8533 - Send events for exceptions in kafka send and receive

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new a1481a0  SLING-8533 - Send events for exceptions in kafka send and receive
a1481a0 is described below

commit a1481a09930b5d48dc968924e463fb84ba9de8b6
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jul 2 14:27:44 2019 +0200

    SLING-8533 - Send events for exceptions in kafka send and receive
---
 pom.xml                                                 |  5 +++++
 .../distribution/journal/kafka/KafkaClientProvider.java | 17 +++++++++++++----
 .../journal/kafka/KafkaJsonMessagePoller.java           | 14 ++++++++++----
 .../journal/kafka/KafkaJsonMessageSender.java           |  7 ++++++-
 .../distribution/journal/kafka/KafkaMessagePoller.java  | 10 ++++++++--
 .../distribution/journal/kafka/KafkaMessageSender.java  |  7 ++++++-
 .../journal/kafka/KafkaClientProviderTest.java          |  6 ++++++
 .../journal/kafka/KafkaMessageSenderTest.java           |  4 ++++
 .../distribution/journal/kafka/util/KafkaRule.java      |  1 -
 9 files changed, 58 insertions(+), 13 deletions(-)

diff --git a/pom.xml b/pom.xml
index d49b3ee..6908778 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,11 @@
             <artifactId>org.osgi.service.metatype.annotations</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.cmpn</artifactId>
+            <scope>provided</scope>
+        </dependency>
         <!-- Apache Kafka -->
         <dependency>
             <groupId>org.apache.servicemix.bundles</groupId>
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index bc515eb..4b31bbb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -57,6 +57,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.JsonMessageSender;
 import org.apache.sling.distribution.journal.MessageHandler;
@@ -68,6 +69,8 @@ import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.ConfigurationPolicy;
 import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
 import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +84,11 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaClientProvider.class);
 
     public static final int PARTITION = 0;
+    
+    @Reference
+    private EventAdmin eventAdmin;
+
+    private ExceptionEventSender eventSender;
 
     private volatile KafkaProducer<String, byte[]> rawProducer = null;
 
@@ -100,6 +108,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
 
     @Activate
     public void activate(KafkaEndpoint kafkaEndpoint) {
+        eventSender = new ExceptionEventSender(eventAdmin);
         kafkaBootstrapServers = requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
         requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
         defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
@@ -116,7 +125,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
 
     @Override
     public <T extends GeneratedMessage> MessageSender<T> createSender() {
-        return new KafkaMessageSender<>(buildKafkaProducer());
+        return new KafkaMessageSender<>(buildKafkaProducer(), eventSender);
     }
 
     @Override
@@ -137,14 +146,14 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
         } else {
             consumer.seekToEnd(topicPartitions);
         }
-        Closeable poller = new KafkaMessagePoller(consumer, adapters);
+        Closeable poller = new KafkaMessagePoller(consumer, eventSender, adapters);
         LOG.info("Created poller for reset {}, topicName {}, assign {}", reset, topicName, assign);
         return poller;
     }
 
     @Override
     public <T> JsonMessageSender<T> createJsonSender() {
-        return new KafkaJsonMessageSender<>(buildJsonKafkaProducer());
+        return new KafkaJsonMessageSender<>(buildJsonKafkaProducer(), eventSender);
     }
 
     @Override
@@ -158,7 +167,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
         } else {
             consumer.seekToEnd(topicPartitions);
         }
-        return new KafkaJsonMessagePoller<>(consumer, handler, type);
+        return new KafkaJsonMessagePoller<>(consumer, eventSender, handler, type);
     }
 
     @Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 7810368..3597d7a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.kafka;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageInfo;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
 import static java.lang.String.format;
 import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
 
 public class KafkaJsonMessagePoller<T> implements Closeable {
 
@@ -47,11 +49,14 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
 
     private final ObjectReader reader;
 
-    public KafkaJsonMessagePoller(KafkaConsumer<String, String> consumer, MessageHandler<T> handler, Class<T> clazz) {
-        this.consumer = consumer;
-        this.handler = handler;
+    private final ExceptionEventSender eventSender;
+
+    public KafkaJsonMessagePoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, Class<T> clazz) {
+        this.consumer = requireNonNull(consumer);
+        this.eventSender = requireNonNull(eventSender);
+        this.handler = requireNonNull(handler);
         ObjectMapper mapper = new ObjectMapper();
-        reader = mapper.readerFor(clazz);
+        reader = mapper.readerFor(requireNonNull(clazz));
         startBackgroundThread(this::run, format("Message Json Poller for handler %s", handler));
     }
 
@@ -96,6 +101,7 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
             T message = reader.readValue(payload);
             handler.handle(info, message);
         } catch (IOException e) {
+            eventSender.send(e);
             LOG.error("Failed to parse payload {}", payload);
         }
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
index b07fb91..83fb495 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
@@ -25,6 +25,7 @@ import static org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PA
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.JsonMessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.slf4j.Logger;
@@ -41,7 +42,10 @@ public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> {
 
     private final KafkaProducer<String, String> producer;
 
-    public KafkaJsonMessageSender(KafkaProducer<String, String> producer) {
+    private final ExceptionEventSender eventSender;
+
+    public KafkaJsonMessageSender(KafkaProducer<String, String> producer, ExceptionEventSender eventSender) {
+        this.eventSender = eventSender;
         this.producer = requireNonNull(producer);
     }
 
@@ -57,6 +61,7 @@ public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> {
             RecordMetadata metadata = producer.send(record).get();
             LOG.info(format("Sent JSON to %s", metadata));
         } catch (Exception e) {
+            eventSender.send(e);
             throw new MessagingException(format("Failed to send JSON message on topic %s", topic), e);
         }
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 8dfec8f..8046901 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.sling.distribution.journal.messages.Types;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageInfo;
 import com.google.protobuf.ByteString;
@@ -40,6 +41,7 @@ import static java.lang.Integer.parseInt;
 import static java.lang.String.format;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
 
 public class KafkaMessagePoller implements Closeable {
 
@@ -53,8 +55,11 @@ public class KafkaMessagePoller implements Closeable {
 
     private final String types;
 
-    public KafkaMessagePoller(KafkaConsumer<String, byte[]> consumer, HandlerAdapter<?>... handlerAdapters) {
-        this.consumer = consumer;
+    private final ExceptionEventSender eventSender;
+
+    public KafkaMessagePoller(KafkaConsumer<String, byte[]> consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... handlerAdapters) {
+        this.consumer = requireNonNull(consumer);
+        this.eventSender = requireNonNull(eventSender);
         for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
             handlers.put(handlerAdapter.getType(), handlerAdapter);
         }
@@ -112,6 +117,7 @@ public class KafkaMessagePoller implements Closeable {
             try {
                 handleRecord(adapter, record);
             } catch (Exception e) {
+                eventSender.send(e);
                 String msg = format("Error consuming message for types %s", types);
                 LOG.warn(msg);
             }
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
index a29bfe4..1c96af9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.sling.distribution.journal.messages.Types;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
 import com.google.protobuf.GeneratedMessage;
@@ -44,7 +45,10 @@ public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSe
 
     private final KafkaProducer<String, byte[]> producer;
 
-    public KafkaMessageSender(KafkaProducer<String, byte[]> producer) {
+    private final ExceptionEventSender eventSender;
+
+    public KafkaMessageSender(KafkaProducer<String, byte[]> producer, ExceptionEventSender eventSender) {
+        this.eventSender = requireNonNull(eventSender);
         this.producer = requireNonNull(producer);
     }
 
@@ -63,6 +67,7 @@ public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSe
             RecordMetadata metadata = producer.send(record).get();
             LOG.info(format("Sent to %s", metadata));
         } catch (InterruptedException | ExecutionException e) {
+            eventSender.send(e);
             throw new MessagingException(format("Failed to send message on topic %s", topic), e);
         }
     }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index 5ed0699..cafdd69 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -39,15 +39,21 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.service.event.EventAdmin;
 
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaClientProviderTest {
     private static final String TOPIC = "topic";
     
+    @Mock
+    EventAdmin eventAdmin;
+    
+    @InjectMocks
     @Spy
     private KafkaClientProvider provider;
     
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
index eee5a22..5746269 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Future;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
 import org.apache.sling.distribution.journal.messages.Messages.PingMessage;
@@ -44,6 +45,9 @@ public class KafkaMessageSenderTest {
     private static final String TOPIC = "topic";
 
     @Mock
+    private ExceptionEventSender eventSender;
+    
+    @Mock
     private KafkaProducer<String, byte[]> producer;
     
     @InjectMocks
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
index 87a7e6a..851fdfe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -31,7 +31,6 @@ import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
 
 public class KafkaRule implements TestRule {
-
     private KafkaClientProvider provider;
 
     @Override