You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2021/12/20 13:06:35 UTC

[sling-org-apache-sling-distribution-journal-kafka] branch SLING-11028 created (now 3d82acf)

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a change to branch SLING-11028
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git.


      at 3d82acf  SLING-11028 - Allow to assign to relative offset

This branch includes the following new commits:

     new 3d82acf  SLING-11028 - Allow to assign to relative offset

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[sling-org-apache-sling-distribution-journal-kafka] 01/01: SLING-11028 - Allow to assign to relative offset

Posted by cs...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-11028
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 3d82acfe3c1fa4e6b506980fdf53080ac8d39595
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 20 14:06:10 2021 +0100

    SLING-11028 - Allow to assign to relative offset
---
 pom.xml                                            |  2 +-
 .../journal/kafka/KafkaClientProvider.java         | 47 +++++++++++++++++-----
 .../journal/kafka/KafkaClientProviderTest.java     | 22 +++++++++-
 .../journal/kafka/KafkaMessageInfoTest.java        |  2 +-
 .../distribution/journal/kafka/MessagingTest.java  | 35 +++++++++++++++-
 5 files changed, 95 insertions(+), 13 deletions(-)

diff --git a/pom.xml b/pom.xml
index bb39376..0b7e2c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.3.0</version>
+            <version>0.3.1-SNAPSHOT</version>
         </dependency>
         <!-- OSGi -->
         <dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index ec31888..3c16c6a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -138,7 +138,9 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
         Collection<TopicPartition> topicPartitions = singleton(topicPartition);
         consumer.assign(topicPartitions);
         if (assign != null) {
-            consumer.seek(topicPartition, offset(assign));
+            AssignDetails assignDetails = new AssignDetails(assign);
+            long offset = assignDetails.getOffset(consumer, topicPartition);
+            consumer.seek(topicPartition, offset);
         } else if (reset == Reset.earliest) {
             consumer.seekToBeginning(topicPartitions);
         } else {
@@ -181,6 +183,11 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
     public String assignTo(long offset) {
         return format("%s:%s", PARTITION, offset);
     }
+    
+    @Override
+    public String assignTo(Reset reset, long relativeOffset) {
+        return format("%s:%s:%d", PARTITION, reset.name(), relativeOffset);
+    }
 
     protected <T> KafkaConsumer<String, T> createConsumer(Class<? extends Deserializer<?>> deserializer, Reset reset) {
         String groupId = UUID.randomUUID().toString();
@@ -236,16 +243,38 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
         return config;
     }
 
-    private long offset(String assign) {
-        String[] chunks = assign.split(":");
-        if (chunks.length != 2) {
-            throw new IllegalArgumentException(format("Illegal assign %s", assign));
-        }
-        return Long.parseLong(chunks[1]);
-    }
-
     @Override
     public URI getServerUri() {
         return serverUri;
     }
+    
+    static class AssignDetails {
+        private final Reset reset;
+        private final long offset;
+
+        AssignDetails(String assign) {
+            String[] chunks = assign.split(":");
+            if (chunks.length == 3) {
+                String reset = chunks[1];
+                this.reset = Reset.valueOf(reset);
+                offset = Long.parseLong(chunks[2]);
+            } else if (chunks.length == 2) {
+                reset = null;
+                offset = Long.parseLong(chunks[1]);
+            } else {
+                throw new IllegalArgumentException(format("Illegal assign %s", assign));
+            }
+        }
+
+        long getOffset(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
+            Collection<TopicPartition> partitions = singleton(topicPartition);
+            if (reset == Reset.earliest) {
+                return consumer.beginningOffsets(partitions).get(topicPartition) + offset;
+            } else if (reset == Reset.latest) {
+                return consumer.endOffsets(partitions).get(topicPartition) + offset;
+            } else {
+                return offset;
+            }
+        }
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index 358a760..738c175 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -21,10 +21,11 @@ package org.apache.sling.distribution.journal.kafka;
 import static java.util.Collections.emptyMap;
 import static org.apache.sling.distribution.journal.kafka.util.KafkaEndpointBuilder.buildKafkaEndpoint;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 
+import java.net.URI;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -33,6 +34,7 @@ import java.util.Map;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.Reset;
 import org.junit.Assert;
@@ -116,4 +118,22 @@ public class KafkaClientProviderTest {
         String assign = provider.assignTo(1l);
         assertThat(assign, equalTo("0:1"));
     }
+    
+    @Test
+    public void testAssignToRelative() throws Exception {
+        String assign = provider.assignTo(Reset.latest, -1l);
+        assertThat(assign, equalTo("0:latest:-1"));
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidAssign() throws Exception {
+        HandlerAdapter<?> handler = Mockito.mock(HandlerAdapter.class);
+        provider.createPoller(TOPIC, Reset.latest, "", handler);
+    }
+    
+    @Test
+    public void testGeServerURI() throws Exception {
+        URI serverUri = provider.getServerUri();
+        assertThat(serverUri.toString(), equalTo("localhost:9092"));
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
index 097919e..a3dec00 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
@@ -19,7 +19,7 @@
 package org.apache.sling.distribution.journal.kafka;
 
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.when;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index 0989667..e12c420 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -18,9 +18,9 @@
  */
 package org.apache.sling.distribution.journal.kafka;
 
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 import java.io.Closeable;
@@ -112,6 +112,39 @@ public class MessagingTest {
             assertReceived("Should see message as we fall back to earliest");
         }
     }
+    
+    
+    @Test
+    public void testAssignRelativeLatest() throws Exception {
+        DiscoveryMessage msg = createMessage();
+        MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName);
+        messageSender.send(msg);
+        
+        String assign1 = provider.assignTo(Reset.latest, -1);
+        try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) {
+            assertReceived("Starting from latest:-1 .. should see our message");
+        }
+        String assign2 = provider.assignTo(Reset.latest, 0);
+        try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) {
+            assertNotReceived("Should not see message as we fall back to latest");
+        }
+    }
+    
+    @Test
+    public void testAssignRelativeEarliest() throws Exception {
+        DiscoveryMessage msg = createMessage();
+        MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName);
+        messageSender.send(msg);
+        
+        String assign1 = provider.assignTo(Reset.earliest, 0);
+        try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) {
+            assertReceived("Starting from latest:-1 .. should see our message");
+        }
+        String assign2 = provider.assignTo(Reset.earliest, 1);
+        try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) {
+            assertNotReceived("Should not see message as we fall back to latest");
+        }
+    }
 
     private DiscoveryMessage createMessage() {
         return DiscoveryMessage.builder()