You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/05/08 12:49:51 UTC
[sling-org-apache-sling-distribution-journal-kafka] branch master
updated: SLING-8400 - Simplify creation of KafkaMessageInfo (#2)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 910293e SLING-8400 - Simplify creation of KafkaMessageInfo (#2)
910293e is described below
commit 910293e671a51d72aa1baf82cbf78fd67f016b69
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Wed May 8 14:49:47 2019 +0200
SLING-8400 - Simplify creation of KafkaMessageInfo (#2)
---
.../distribution/journal/kafka/KafkaJsonMessagePoller.java | 6 +-----
.../sling/distribution/journal/kafka/KafkaMessageInfo.java | 10 +++++-----
.../sling/distribution/journal/kafka/KafkaMessagePoller.java | 6 +-----
3 files changed, 7 insertions(+), 15 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 0eec98f..7810368 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -90,11 +90,7 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
}
private void handleRecord(ConsumerRecord<String, String> record) {
- MessageInfo info = new KafkaMessageInfo(
- record.topic(),
- record.partition(),
- record.offset(),
- record.timestamp());
+ MessageInfo info = new KafkaMessageInfo(record);
String payload = record.value();
try {
T message = reader.readValue(payload);
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
index 4eebfd0..9d49562 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -27,11 +27,11 @@ public class KafkaMessageInfo implements MessageInfo {
private final long offset;
private final long createTime;
- public KafkaMessageInfo(String topic, int partition, long offset, long createTime) {
- this.topic = topic;
- this.partition = partition;
- this.offset = offset;
- this.createTime = createTime;
+ public KafkaMessageInfo(ConsumerRecord<String, ?> record) {
+ this.topic = record.topic();
+ this.partition = record.partition();
+ this.offset = record.offset();
+ this.createTime = record.timestamp();
}
public String getTopic() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 6b22362..687d253 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -114,11 +114,7 @@ public class KafkaMessagePoller implements Closeable {
}
private void handleRecord(HandlerAdapter<?> adapter, ConsumerRecord<String, byte[]> record) throws Exception {
- MessageInfo info = new KafkaMessageInfo(
- record.topic(),
- record.partition(),
- record.offset(),
- record.timestamp());
+ MessageInfo info = new KafkaMessageInfo(record);
ByteString payload = ByteString.copyFrom(record.value());
adapter.handle(info, payload);
}