You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2021/12/20 13:06:36 UTC

[sling-org-apache-sling-distribution-journal-kafka] 01/01: SLING-11028 - Allow to assign to relative offset

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-11028
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 3d82acfe3c1fa4e6b506980fdf53080ac8d39595
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 20 14:06:10 2021 +0100

    SLING-11028 - Allow to assign to relative offset
---
 pom.xml                                            |  2 +-
 .../journal/kafka/KafkaClientProvider.java         | 47 +++++++++++++++++-----
 .../journal/kafka/KafkaClientProviderTest.java     | 22 +++++++++-
 .../journal/kafka/KafkaMessageInfoTest.java        |  2 +-
 .../distribution/journal/kafka/MessagingTest.java  | 35 +++++++++++++++-
 5 files changed, 95 insertions(+), 13 deletions(-)

diff --git a/pom.xml b/pom.xml
index bb39376..0b7e2c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.3.0</version>
+            <version>0.3.1-SNAPSHOT</version>
         </dependency>
         <!-- OSGi -->
         <dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index ec31888..3c16c6a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -138,7 +138,9 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
         Collection<TopicPartition> topicPartitions = singleton(topicPartition);
         consumer.assign(topicPartitions);
         if (assign != null) {
-            consumer.seek(topicPartition, offset(assign));
+            AssignDetails assignDetails = new AssignDetails(assign);
+            long offset = assignDetails.getOffset(consumer, topicPartition);
+            consumer.seek(topicPartition, offset);
         } else if (reset == Reset.earliest) {
             consumer.seekToBeginning(topicPartitions);
         } else {
@@ -181,6 +183,11 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
     public String assignTo(long offset) {
         return format("%s:%s", PARTITION, offset);
     }
+    
+    @Override
+    public String assignTo(Reset reset, long relativeOffset) {
+        return format("%s:%s:%d", PARTITION, reset.name(), relativeOffset);
+    }
 
     protected <T> KafkaConsumer<String, T> createConsumer(Class<? extends Deserializer<?>> deserializer, Reset reset) {
         String groupId = UUID.randomUUID().toString();
@@ -236,16 +243,38 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
         return config;
     }
 
-    private long offset(String assign) {
-        String[] chunks = assign.split(":");
-        if (chunks.length != 2) {
-            throw new IllegalArgumentException(format("Illegal assign %s", assign));
-        }
-        return Long.parseLong(chunks[1]);
-    }
-
     @Override
     public URI getServerUri() {
         return serverUri;
     }
+    
+    static class AssignDetails {
+        private final Reset reset;
+        private final long offset;
+
+        AssignDetails(String assign) {
+            String[] chunks = assign.split(":");
+            if (chunks.length == 3) {
+                String reset = chunks[1];
+                this.reset = Reset.valueOf(reset);
+                offset = Long.parseLong(chunks[2]);
+            } else if (chunks.length == 2) {
+                reset = null;
+                offset = Long.parseLong(chunks[1]);
+            } else {
+                throw new IllegalArgumentException(format("Illegal assign %s", assign));
+            }
+        }
+
+        long getOffset(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
+            Collection<TopicPartition> partitions = singleton(topicPartition);
+            if (reset == Reset.earliest) {
+                return consumer.beginningOffsets(partitions).get(topicPartition) + offset;
+            } else if (reset == Reset.latest) {
+                return consumer.endOffsets(partitions).get(topicPartition) + offset;
+            } else {
+                return offset;
+            }
+        }
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index 358a760..738c175 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -21,10 +21,11 @@ package org.apache.sling.distribution.journal.kafka;
 import static java.util.Collections.emptyMap;
 import static org.apache.sling.distribution.journal.kafka.util.KafkaEndpointBuilder.buildKafkaEndpoint;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
+import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -33,6 +34,7 @@ import java.util.Map;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.Reset;
 import org.junit.Assert;
@@ -116,4 +118,22 @@ public class KafkaClientProviderTest {
         String assign = provider.assignTo(1l);
         assertThat(assign, equalTo("0:1"));
     }
+    
+    @Test
+    public void testAssignToRelative() throws Exception {
+        String assign = provider.assignTo(Reset.latest, -1l);
+        assertThat(assign, equalTo("0:latest:-1"));
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidAssign() throws Exception {
+        HandlerAdapter<?> handler = Mockito.mock(HandlerAdapter.class);
+        provider.createPoller(TOPIC, Reset.latest, "", handler);
+    }
+    
+    @Test
+    public void testGeServerURI() throws Exception {
+        URI serverUri = provider.getServerUri();
+        assertThat(serverUri.toString(), equalTo("localhost:9092"));
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
index 097919e..a3dec00 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
@@ -19,7 +19,7 @@
 package org.apache.sling.distribution.journal.kafka;
 
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.when;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index 0989667..e12c420 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.sling.distribution.journal.kafka;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.io.Closeable;
@@ -112,6 +112,39 @@ public class MessagingTest {
             assertReceived("Should see message as we fall back to earliest");
         }
     }
+    
+    
+    @Test
+    public void testAssignRelativeLatest() throws Exception {
+        DiscoveryMessage msg = createMessage();
+        MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName);
+        messageSender.send(msg);
+        
+        String assign1 = provider.assignTo(Reset.latest, -1);
+        try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) {
+            assertReceived("Starting from latest:-1 .. should see our message");
+        }
+        String assign2 = provider.assignTo(Reset.latest, 0);
+        try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) {
+            assertNotReceived("Should not see message as we fall back to latest");
+        }
+    }
+    
+    @Test
+    public void testAssignRelativeEarliest() throws Exception {
+        DiscoveryMessage msg = createMessage();
+        MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName);
+        messageSender.send(msg);
+        
+        String assign1 = provider.assignTo(Reset.earliest, 0);
+        try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) {
+            assertReceived("Starting from latest:-1 .. should see our message");
+        }
+        String assign2 = provider.assignTo(Reset.earliest, 1);
+        try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) {
+            assertNotReceived("Should not see message as we fall back to latest");
+        }
+    }
 
     private DiscoveryMessage createMessage() {
         return DiscoveryMessage.builder()