You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2020/04/22 14:59:14 UTC
[sling-org-apache-sling-distribution-journal-messages] branch
master updated: SLING-9385 - Introduce properties
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-messages.git
The following commit(s) were added to refs/heads/master by this push:
new 6b0d26b SLING-9385 - Introduce properties
6b0d26b is described below
commit 6b0d26b2738bef284e76e3657ac7f57c7695b6d9
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Wed Apr 22 16:59:00 2020 +0200
SLING-9385 - Introduce properties
---
.../sling/distribution/journal/MessageInfo.java | 6 ++---
.../sling/distribution/journal/MessageSender.java | 8 ++++--
.../distribution/journal/MessagingProvider.java | 31 +++++++++++++++-------
3 files changed, 30 insertions(+), 15 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
index 29c1a57..948489d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageInfo.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.journal;
+import java.util.Map;
+
public interface MessageInfo {
String getTopic();
@@ -28,7 +30,5 @@ public interface MessageInfo {
long getCreateTime();
- String getOrg();
-
- String getSource();
+ Map<String, String> getProps();
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
index 83a9eaa..0a7f668 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessageSender.java
@@ -18,10 +18,14 @@
*/
package org.apache.sling.distribution.journal;
+import java.util.Map;
+
import com.google.protobuf.GeneratedMessage;
public interface MessageSender<T extends GeneratedMessage> {
- void send(String topic, T payload) throws MessagingException;
+ void send(String topic, T payload) throws MessagingException;
+
+ void send(String topic, T payload, Map<String, String> properties) throws MessagingException;
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
index c73dfb1..c4d49f9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/MessagingProvider.java
@@ -19,27 +19,38 @@
package org.apache.sling.distribution.journal;
import java.io.Closeable;
+import java.util.function.Consumer;
import com.google.protobuf.GeneratedMessage;
public interface MessagingProvider {
- <T extends GeneratedMessage> MessageSender<T> createSender();
+ <T extends GeneratedMessage> MessageSender<T> createSender();
+
+ /**
+ * Create a sender for a fixed topic
+ *
+ * @param <T>
+ * @param topic
+ * @return
+ */
+ default <T extends GeneratedMessage> Consumer<T> createSender(String topic) {
+ MessageSender<GeneratedMessage> sender = createSender();
+ return payload -> sender.send(topic, payload);
+ }
- <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
+ <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters);
- Closeable createPoller(String topicName, Reset reset, String assign,
- HandlerAdapter<?>... adapters);
+ Closeable createPoller(String topicName, Reset reset, String assign, HandlerAdapter<?>... adapters);
- <T> JsonMessageSender<T> createJsonSender();
+ <T> JsonMessageSender<T> createJsonSender();
- <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler,
- Class<T> type);
+ <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler, Class<T> type);
- void assertTopic(String topic) throws MessagingException;
+ void assertTopic(String topic) throws MessagingException;
- long retrieveOffset(String topicName, Reset reset);
+ long retrieveOffset(String topicName, Reset reset);
- String assignTo(long offset);
+ String assignTo(long offset);
}
\ No newline at end of file