You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2019/07/03 10:38:28 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch master updated (2db90bc -> 91c62a7)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git.


    from 2db90bc  SLING-8554 - Remove final
     new 9ddfdbc  SLING-8554 - Refactor to avoid null
     new 91c62a7  SLING-8554 - Add second message to test while loop in poller

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../journal/kafka/KafkaMessagePoller.java          | 61 ++++++++++------------
 .../distribution/journal/kafka/MessagingTest.java  | 35 ++++++-------
 2 files changed, 45 insertions(+), 51 deletions(-)


[sling-org-apache-sling-distribution-journal-kafka] 01/02: SLING-8554 - Refactor to avoid null

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 9ddfdbc554411eb8449bd18068f35624a628cded
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jul 3 12:35:47 2019 +0200

    SLING-8554 - Refactor to avoid null
---
 .../journal/kafka/KafkaMessagePoller.java          | 61 ++++++++++------------
 1 file changed, 29 insertions(+), 32 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 8046901..3e20d75 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.sling.distribution.journal.messages.Types;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
@@ -78,7 +79,8 @@ public class KafkaMessagePoller implements Closeable {
         LOG.info("Start poller for types {}", types);
         try {
             while(running) {
-                consume();
+                consumer.poll(ofHours(1))
+                    .forEach(this::handleRecord);
             }
         } catch (WakeupException e) {
             if (running) {
@@ -96,47 +98,42 @@ public class KafkaMessagePoller implements Closeable {
         LOG.info("Stop poller for types {}", types);
     }
 
-    private void consume() {
-        consumer.poll(ofHours(1))
-                .forEach(this::handleRecord);
+    private void handleRecord(ConsumerRecord<String, byte[]> record) {
+        getHandler(record)
+            .ifPresent(handler->handleRecord(handler, record));
     }
 
-    private void handleRecord(ConsumerRecord<String, byte[]> record) {
-        Class<?> type;
-        HandlerAdapter<?> adapter;
+    private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) {
         try {
-            type = Types.getType(
-                    parseInt(getHeaderValue(record.headers(), "type")),
-                    parseInt(getHeaderValue(record.headers(), "version")));
-            adapter = handlers.get(type);
-        } catch (RuntimeException e) {
-            LOG.info("Ignoring unknown message");
-            return;
-        }
-        if (adapter != null) {
-            try {
-                handleRecord(adapter, record);
-            } catch (Exception e) {
-                eventSender.send(e);
-                String msg = format("Error consuming message for types %s", types);
-                LOG.warn(msg);
+            int type = parseInt(getHeaderValue(record.headers(), "type"));
+            int version = parseInt(getHeaderValue(record.headers(), "version"));
+            Class<?> messageClass = Types.getType(type, version);
+            Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass));
+            if (!handler.isPresent()) {
+                LOG.debug("No handler registered for type {}", messageClass.getName());
             }
-        } else {
-            LOG.debug("No handler registered for type {}", type.getName());
+            return handler;
+        } catch (RuntimeException e) {
+            LOG.info("No handler found for headers {}.", record.headers(), e);
+            return Optional.empty();
         }
     }
 
-    private void handleRecord(HandlerAdapter<?> adapter, ConsumerRecord<String, byte[]> record) throws Exception {
-        MessageInfo info = new KafkaMessageInfo(record);
-        ByteString payload = ByteString.copyFrom(record.value());
-        adapter.handle(info, payload);
+    private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) {
+        try {
+            MessageInfo info = new KafkaMessageInfo(record);
+            ByteString payload = ByteString.copyFrom(record.value());
+            handler.handle(info, payload);
+        } catch (Exception e) {
+            eventSender.send(e);
+            String msg = format("Error consuming message for types %s", types);
+            LOG.warn(msg);
+        }
     }
 
     private String getHeaderValue(Headers headers, String key) {
-        Header header = headers.lastHeader(key);
-        if (header == null) {
-            throw new IllegalArgumentException(format("Header with key %s not found", key));
-        }
+        Header header = Optional.ofNullable(headers.lastHeader(key))
+            .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key)));
         return new String(header.value(), UTF_8);
     }
 }


[sling-org-apache-sling-distribution-journal-kafka] 02/02: SLING-8554 - Add second message to test while loop in poller

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 91c62a795783d53cb80ebb2653e7725c28b010dc
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Jul 3 12:38:18 2019 +0200

    SLING-8554 - Add second message to test while loop in poller
---
 .../distribution/journal/kafka/MessagingTest.java  | 35 ++++++++++------------
 1 file changed, 16 insertions(+), 19 deletions(-)

diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index 431f692..ae28d4c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -62,34 +62,19 @@ public class MessagingTest {
     public void testSendReceive() throws Exception {
         HandlerAdapter<DiscoveryMessage> handler = HandlerAdapter.create(DiscoveryMessage.class, this::handle);
         Closeable poller = provider.createPoller(topicName, Reset.earliest, handler);
-        DiscoveryMessage msg = DiscoveryMessage.newBuilder()
-                .setSubAgentName("sub1agent")
-                .setSubSlingId("subsling")
-                .setSubscriberConfiguration(SubscriberConfiguration
-                        .newBuilder()
-                        .setEditable(false)
-                        .setMaxRetries(-1)
-                        .build())
-                .build();
         MessageSender<DiscoveryMessage> messageSender = provider.createSender();
         
-        messageSender.send(topicName, msg);
+        messageSender.send(topicName, createMessage());
         assertReceived("Consumer started from earliest .. should see our message");
+        messageSender.send(topicName, createMessage());
+        assertReceived("Should also consume a second message");
 
         poller.close();
     }
     
     @Test
     public void testAssign() throws Exception {
-        DiscoveryMessage msg = DiscoveryMessage.newBuilder()
-                .setSubAgentName("sub1agent")
-                .setSubSlingId("subsling")
-                .setSubscriberConfiguration(SubscriberConfiguration
-                        .newBuilder()
-                        .setEditable(false)
-                        .setMaxRetries(-1)
-                        .build())
-                .build();
+        DiscoveryMessage msg = createMessage();
         MessageSender<DiscoveryMessage> messageSender = provider.createSender();
         messageSender.send(topicName, msg);
         
@@ -116,6 +101,18 @@ public class MessagingTest {
         }
     }
 
+    private DiscoveryMessage createMessage() {
+        return DiscoveryMessage.newBuilder()
+                .setSubAgentName("sub1agent")
+                .setSubSlingId("subsling")
+                .setSubscriberConfiguration(SubscriberConfiguration
+                        .newBuilder()
+                        .setEditable(false)
+                        .setMaxRetries(-1)
+                        .build())
+                .build();
+    }
+
     private void assertReceived(String message) throws InterruptedException {
         assertTrue(message, sem.tryAcquire(30, TimeUnit.SECONDS));
     }