You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/05/08 12:49:51 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch master updated: SLING-8400 - Simplify creation of KafkaMessageInfo (#2)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 910293e  SLING-8400 - Simplify creation of KafkaMessageInfo (#2)
910293e is described below

commit 910293e671a51d72aa1baf82cbf78fd67f016b69
Author: Christian Schneider <ch...@die-schneider.net>
AuthorDate: Wed May 8 14:49:47 2019 +0200

    SLING-8400 - Simplify creation of KafkaMessageInfo (#2)
---
 .../distribution/journal/kafka/KafkaJsonMessagePoller.java     |  6 +-----
 .../sling/distribution/journal/kafka/KafkaMessageInfo.java     | 10 +++++-----
 .../sling/distribution/journal/kafka/KafkaMessagePoller.java   |  6 +-----
 3 files changed, 7 insertions(+), 15 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
index 0eec98f..7810368 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -90,11 +90,7 @@ public class KafkaJsonMessagePoller<T> implements Closeable {
     }
 
     private void handleRecord(ConsumerRecord<String, String> record) {
-        MessageInfo info = new KafkaMessageInfo(
-                record.topic(),
-                record.partition(),
-                record.offset(),
-                record.timestamp());
+        MessageInfo info = new KafkaMessageInfo(record);
         String payload = record.value();
         try {
             T message = reader.readValue(payload);
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
index 4eebfd0..9d49562 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -27,11 +27,11 @@ public class KafkaMessageInfo implements MessageInfo {
     private final long offset;
     private final long createTime;
 
-    public KafkaMessageInfo(String topic, int partition, long offset, long createTime) {
-        this.topic = topic;
-        this.partition = partition;
-        this.offset = offset;
-        this.createTime = createTime;
+    public KafkaMessageInfo(ConsumerRecord<String, ?> record) {
+        this.topic = record.topic();
+        this.partition = record.partition();
+        this.offset = record.offset();
+        this.createTime = record.timestamp();
     }
 
     public String getTopic() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
index 6b22362..687d253 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -114,11 +114,7 @@ public class KafkaMessagePoller implements Closeable {
     }
 
     private void handleRecord(HandlerAdapter<?> adapter, ConsumerRecord<String, byte[]> record) throws Exception {
-        MessageInfo info = new KafkaMessageInfo(
-                record.topic(),
-                record.partition(),
-                record.offset(),
-                record.timestamp());
+        MessageInfo info = new KafkaMessageInfo(record);
         ByteString payload = ByteString.copyFrom(record.value());
         adapter.handle(info, payload);
     }