You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/02 12:56:41 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch master updated (a1481a0 -> 5b01420)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git.


    from a1481a0  SLING-8533 - Send events for exceptions in kafka send and receive
     new 866da21  Reinterrupt thread
     new 5b01420  Avoid volatile

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../distribution/journal/kafka/KafkaClientProvider.java    |  6 +++---
 .../distribution/journal/kafka/KafkaMessageSender.java     | 14 ++++++++++----
 2 files changed, 13 insertions(+), 7 deletions(-)


[sling-org-apache-sling-distribution-journal-kafka] 01/02: Reinterrupt thread

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 866da21ab4071d19aa790131149e90a62832b6e9
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jul 2 14:40:02 2019 +0200

    Reinterrupt thread
---
 .../distribution/journal/kafka/KafkaMessageSender.java     | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
index 1c96af9..870f444 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
@@ -19,7 +19,6 @@
 package org.apache.sling.distribution.journal.kafka;
 
 import java.util.Arrays;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.sling.distribution.journal.messages.Types;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
@@ -66,12 +65,19 @@ public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSe
         try {
             RecordMetadata metadata = producer.send(record).get();
             LOG.info(format("Sent to %s", metadata));
-        } catch (InterruptedException | ExecutionException e) {
-            eventSender.send(e);
-            throw new MessagingException(format("Failed to send message on topic %s", topic), e);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            handleException(topic, e);
+        } catch (Exception e) {
+            handleException(topic, e);
         }
     }
 
+    private void handleException(String topic, Exception e) {
+        eventSender.send(e);
+        throw new MessagingException(format("Failed to send message on topic %s", topic), e);
+    }
+
     private Iterable<Header> toHeaders(int type, int version) {
         return Arrays.asList(toHeader("type", type),
                 toHeader("version",version));


[sling-org-apache-sling-distribution-journal-kafka] 02/02: Avoid volatile

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 5b01420faa81bcfbf44c57602e05e0237635238f
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Tue Jul 2 14:50:44 2019 +0200

    Avoid volatile
---
 .../sling/distribution/journal/kafka/KafkaClientProvider.java       | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index 4b31bbb..78046c0 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -90,9 +90,9 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
 
     private ExceptionEventSender eventSender;
 
-    private volatile KafkaProducer<String, byte[]> rawProducer = null;
+    private KafkaProducer<String, byte[]> rawProducer = null;
 
-    private volatile KafkaProducer<String, String> jsonProducer = null;
+    private KafkaProducer<String, String> jsonProducer = null;
 
     private String kafkaBootstrapServers;
 
@@ -118,7 +118,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
     }
     
     @Deactivate
-    public void close() {
+    public synchronized void close() {
         closeQuietly(rawProducer);
         closeQuietly(jsonProducer);
     }