You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/02 12:33:03 UTC
[sling-org-apache-sling-distribution-journal-kafka] branch master
updated: SLING-8533 - Send events for exceptions in kafka send and receive
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new a1481a0 SLING-8533 - Send events for exceptions in kafka send and receive
a1481a0 is described below
commit a1481a09930b5d48dc968924e463fb84ba9de8b6
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jul 2 14:27:44 2019 +0200
SLING-8533 - Send events for exceptions in kafka send and receive
---
pom.xml | 5 +++++
.../distribution/journal/kafka/KafkaClientProvider.java | 17 +++++++++++++----
.../journal/kafka/KafkaJsonMessagePoller.java | 14 ++++++++++----
.../journal/kafka/KafkaJsonMessageSender.java | 7 ++++++-
.../distribution/journal/kafka/KafkaMessagePoller.java | 10 ++++++++--
.../distribution/journal/kafka/KafkaMessageSender.java | 7 ++++++-
.../journal/kafka/KafkaClientProviderTest.java | 6 ++++++
.../journal/kafka/KafkaMessageSenderTest.java | 4 ++++
.../distribution/journal/kafka/util/KafkaRule.java | 1 -
9 files changed, 58 insertions(+), 13 deletions(-)
diff --git a/pom.xml b/pom.xml
index d49b3ee..6908778 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,6 +83,11 @@
<artifactId>org.osgi.service.metatype.annotations</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.cmpn</artifactId>
+ <scope>provided</scope>
+ </dependency>
<!-- Apache Kafka -->
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index bc515eb..4b31bbb 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -57,6 +57,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessageHandler;
@@ -68,6 +69,8 @@ import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +84,11 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaClientProvider.class);
public static final int PARTITION = 0;
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ private ExceptionEventSender eventSender;
private volatile KafkaProducer<String, byte[]> rawProducer = null;
@@ -100,6 +108,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
@Activate
public void activate(KafkaEndpoint kafkaEndpoint) {
+ eventSender = new ExceptionEventSender(eventAdmin);
kafkaBootstrapServers = requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
@@ -116,7 +125,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
@Override
public <T extends GeneratedMessage> MessageSender<T> createSender() {
- return new KafkaMessageSender<>(buildKafkaProducer());
+ return new KafkaMessageSender<>(buildKafkaProducer(), eventSender);
}
@Override
@@ -137,14 +146,14 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
} else {
consumer.seekToEnd(topicPartitions);
}
- Closeable poller = new KafkaMessagePoller(consumer, adapters);
+ Closeable poller = new KafkaMessagePoller(consumer, eventSender, adapters);
LOG.info("Created poller for reset {}, topicName {}, assign {}", reset, topicName, assign);
return poller;
}
@Override
public <T> JsonMessageSender<T> createJsonSender() {
- return new KafkaJsonMessageSender<>(buildJsonKafkaProducer());
+ return new KafkaJsonMessageSender<>(buildJsonKafkaProducer(), eventSender);
}
@Override
@@ -158,7 +167,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
} else {
consumer.seekToEnd(topicPartitions);
}
- return new KafkaJsonMessagePoller<>(consumer, handler, type);
+ return new KafkaJsonMessagePoller<>(consumer, eventSender, handler, type);
}
@Override
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 7810368..3597d7a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.kafka;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -34,6 +35,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
import static java.lang.String.format;
import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
public class KafkaJsonMessagePoller<T> implements Closeable {
@@ -47,11 +49,14 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
private final ObjectReader reader;
- public KafkaJsonMessagePoller(KafkaConsumer<String, String> consumer, MessageHandler<T> handler, Class<T> clazz) {
- this.consumer = consumer;
- this.handler = handler;
+ private final ExceptionEventSender eventSender;
+
+ public KafkaJsonMessagePoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, Class<T> clazz) {
+ this.consumer = requireNonNull(consumer);
+ this.eventSender = requireNonNull(eventSender);
+ this.handler = requireNonNull(handler);
ObjectMapper mapper = new ObjectMapper();
- reader = mapper.readerFor(clazz);
+ reader = mapper.readerFor(requireNonNull(clazz));
startBackgroundThread(this::run, format("Message Json Poller for handler %s", handler));
}
@@ -96,6 +101,7 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
T message = reader.readValue(payload);
handler.handle(info, message);
} catch (IOException e) {
+ eventSender.send(e);
LOG.error("Failed to parse payload {}", payload);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
index b07fb91..83fb495 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
@@ -25,6 +25,7 @@ import static org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PA
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.JsonMessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.slf4j.Logger;
@@ -41,7 +42,10 @@ public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> {
private final KafkaProducer<String, String> producer;
- public KafkaJsonMessageSender(KafkaProducer<String, String> producer) {
+ private final ExceptionEventSender eventSender;
+
+ public KafkaJsonMessageSender(KafkaProducer<String, String> producer, ExceptionEventSender eventSender) {
+ this.eventSender = eventSender;
this.producer = requireNonNull(producer);
}
@@ -57,6 +61,7 @@ public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> {
RecordMetadata metadata = producer.send(record).get();
LOG.info(format("Sent JSON to %s", metadata));
} catch (Exception e) {
+ eventSender.send(e);
throw new MessagingException(format("Failed to send JSON message on topic %s", topic), e);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 8dfec8f..8046901 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.sling.distribution.journal.messages.Types;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageInfo;
import com.google.protobuf.ByteString;
@@ -40,6 +41,7 @@ import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.time.Duration.ofHours;
+import static java.util.Objects.requireNonNull;
public class KafkaMessagePoller implements Closeable {
@@ -53,8 +55,11 @@ public class KafkaMessagePoller implements Closeable {
private final String types;
- public KafkaMessagePoller(KafkaConsumer<String, byte[]> consumer, HandlerAdapter<?>... handlerAdapters) {
- this.consumer = consumer;
+ private final ExceptionEventSender eventSender;
+
+ public KafkaMessagePoller(KafkaConsumer<String, byte[]> consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... handlerAdapters) {
+ this.consumer = requireNonNull(consumer);
+ this.eventSender = requireNonNull(eventSender);
for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
handlers.put(handlerAdapter.getType(), handlerAdapter);
}
@@ -112,6 +117,7 @@ public class KafkaMessagePoller implements Closeable {
try {
handleRecord(adapter, record);
} catch (Exception e) {
+ eventSender.send(e);
String msg = format("Error consuming message for types %s", types);
LOG.warn(msg);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
index a29bfe4..1c96af9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import org.apache.sling.distribution.journal.messages.Types;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import com.google.protobuf.GeneratedMessage;
@@ -44,7 +45,10 @@ public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSe
private final KafkaProducer<String, byte[]> producer;
- public KafkaMessageSender(KafkaProducer<String, byte[]> producer) {
+ private final ExceptionEventSender eventSender;
+
+ public KafkaMessageSender(KafkaProducer<String, byte[]> producer, ExceptionEventSender eventSender) {
+ this.eventSender = requireNonNull(eventSender);
this.producer = requireNonNull(producer);
}
@@ -63,6 +67,7 @@ public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSe
RecordMetadata metadata = producer.send(record).get();
LOG.info(format("Sent to %s", metadata));
} catch (InterruptedException | ExecutionException e) {
+ eventSender.send(e);
throw new MessagingException(format("Failed to send message on topic %s", topic), e);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index 5ed0699..cafdd69 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -39,15 +39,21 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.service.event.EventAdmin;
@RunWith(MockitoJUnitRunner.class)
public class KafkaClientProviderTest {
private static final String TOPIC = "topic";
+ @Mock
+ EventAdmin eventAdmin;
+
+ @InjectMocks
@Spy
private KafkaClientProvider provider;
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
index eee5a22..5746269 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
import org.apache.sling.distribution.journal.messages.Messages.PingMessage;
@@ -44,6 +45,9 @@ public class KafkaMessageSenderTest {
private static final String TOPIC = "topic";
@Mock
+ private ExceptionEventSender eventSender;
+
+ @Mock
private KafkaProducer<String, byte[]> producer;
@InjectMocks
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
index 87a7e6a..851fdfe 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -31,7 +31,6 @@ import org.junit.runner.Description;
import org.junit.runners.model.Statement;
public class KafkaRule implements TestRule {
-
private KafkaClientProvider provider;
@Override