You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/04/22 14:59:14 UTC

[sling-org-apache-sling-distribution-journal-messages] branch master updated: SLING-9385 - Introduce properties

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b0d26b  SLING-9385 - Introduce properties
6b0d26b is described below

commit 6b0d26b2738bef284e76e3657ac7f57c7695b6d9
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Apr 22 16:59:00 2020 +0200

    SLING-9385 - Introduce properties
---
 .../sling/distribution/journal/MessageInfo.java    |  6 ++---
 .../sling/distribution/journal/MessageSender.java  |  8 ++++--
 .../distribution/journal/MessagingProvider.java    | 31 +++++++++++++++-------
 3 files changed, 30 insertions(+), 15 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
index 29c1a57..948489d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.distribution.journal;
 
+import java.util.Map;
+
 public interface MessageInfo {
 
     String getTopic();
@@ -28,7 +30,5 @@ public interface MessageInfo {
 
     long getCreateTime();
     
-    String getOrg();
-    
-    String getSource();
+    Map<String, String> getProps();
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
index 83a9eaa..0a7f668 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
@@ -18,10 +18,14 @@
  */
 package org.apache.sling.distribution.journal;
 
+import java.util.Map;
+
 import com.google.protobuf.GeneratedMessage;
 
 public interface MessageSender<T extends GeneratedMessage> {
 
-	void send(String topic, T payload) throws MessagingException;
+    void send(String topic, T payload) throws MessagingException;
+    
+    void send(String topic, T payload, Map<String, String> properties) throws MessagingException;
 
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
index c73dfb1..c4d49f9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
@@ -19,27 +19,38 @@
 package org.apache.sling.distribution.journal;
 
 import java.io.Closeable;
+import java.util.function.Consumer;
 
 import com.google.protobuf.GeneratedMessage;
 
 public interface MessagingProvider {
 
-	<T extends GeneratedMessage> MessageSender<T> createSender();
+    <T extends GeneratedMessage> MessageSender<T> createSender();
+    
+    /**
+     * Create a sender for a fixed topic
+     * 
+     * @param <T>
+     * @param topic
+     * @return
+     */
+    default <T extends GeneratedMessage> Consumer<T> createSender(String topic) {
+        MessageSender<GeneratedMessage> sender = createSender();
+        return payload -> sender.send(topic, payload);
+    }
 
-	<T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
+    <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
 
-	Closeable createPoller(String topicName, Reset reset, String assign,
-						   HandlerAdapter<?>... adapters);
+    Closeable createPoller(String topicName, Reset reset, String assign, HandlerAdapter<?>... adapters);
 
-	<T> JsonMessageSender<T> createJsonSender();
+    <T> JsonMessageSender<T> createJsonSender();
 
-	<T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler,
-								   Class<T> type);
+    <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler, Class<T> type);
 
-	void assertTopic(String topic) throws MessagingException;
+    void assertTopic(String topic) throws MessagingException;
 
-	long retrieveOffset(String topicName, Reset reset);
+    long retrieveOffset(String topicName, Reset reset);
 
-	String assignTo(long offset);
+    String assignTo(long offset);
 
 }
\ No newline at end of file