You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/02 12:56:41 UTC
[sling-org-apache-sling-distribution-journal-kafka] branch master
updated (a1481a0 -> 5b01420)
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git.
from a1481a0 SLING-8533 - Send events for exceptions in kafka send and receive
new 866da21 Reinterrupt thread
new 5b01420 Avoid volatile
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../distribution/journal/kafka/KafkaClientProvider.java | 6 +++---
.../distribution/journal/kafka/KafkaMessageSender.java | 14 ++++++++++----
2 files changed, 13 insertions(+), 7 deletions(-)
[sling-org-apache-sling-distribution-journal-kafka] 01/02:
Reinterrupt thread
Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
commit 866da21ab4071d19aa790131149e90a62832b6e9
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jul 2 14:40:02 2019 +0200
Reinterrupt thread
---
.../distribution/journal/kafka/KafkaMessageSender.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
index 1c96af9..870f444 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
@@ -19,7 +19,6 @@
package org.apache.sling.distribution.journal.kafka;
import java.util.Arrays;
-import java.util.concurrent.ExecutionException;
import org.apache.sling.distribution.journal.messages.Types;
import org.apache.sling.distribution.journal.ExceptionEventSender;
@@ -66,12 +65,19 @@ public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSe
try {
RecordMetadata metadata = producer.send(record).get();
LOG.info(format("Sent to %s", metadata));
- } catch (InterruptedException | ExecutionException e) {
- eventSender.send(e);
- throw new MessagingException(format("Failed to send message on topic %s", topic), e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ handleException(topic, e);
+ } catch (Exception e) {
+ handleException(topic, e);
}
}
+ private void handleException(String topic, Exception e) {
+ eventSender.send(e);
+ throw new MessagingException(format("Failed to send message on topic %s", topic), e);
+ }
+
private Iterable<Header> toHeaders(int type, int version) {
return Arrays.asList(toHeader("type", type),
toHeader("version",version));
[sling-org-apache-sling-distribution-journal-kafka] 02/02: Avoid
volatile
Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
commit 5b01420faa81bcfbf44c57602e05e0237635238f
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jul 2 14:50:44 2019 +0200
Avoid volatile
---
.../sling/distribution/journal/kafka/KafkaClientProvider.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index 4b31bbb..78046c0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -90,9 +90,9 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
private ExceptionEventSender eventSender;
- private volatile KafkaProducer<String, byte[]> rawProducer = null;
+ private KafkaProducer<String, byte[]> rawProducer = null;
- private volatile KafkaProducer<String, String> jsonProducer = null;
+ private KafkaProducer<String, String> jsonProducer = null;
private String kafkaBootstrapServers;
@@ -118,7 +118,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
}
@Deactivate
- public void close() {
+ public synchronized void close() {
closeQuietly(rawProducer);
closeQuietly(jsonProducer);
}