You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2021/12/20 13:06:36 UTC
[sling-org-apache-sling-distribution-journal-kafka] 01/01: SLING-11028 - Allow to assign to relative offset
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch SLING-11028
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
commit 3d82acfe3c1fa4e6b506980fdf53080ac8d39595
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Dec 20 14:06:10 2021 +0100
SLING-11028 - Allow to assign to relative offset
---
pom.xml | 2 +-
.../journal/kafka/KafkaClientProvider.java | 47 +++++++++++++++++-----
.../journal/kafka/KafkaClientProviderTest.java | 22 +++++++++-
.../journal/kafka/KafkaMessageInfoTest.java | 2 +-
.../distribution/journal/kafka/MessagingTest.java | 35 +++++++++++++++-
5 files changed, 95 insertions(+), 13 deletions(-)
diff --git a/pom.xml b/pom.xml
index bb39376..0b7e2c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
- <version>0.3.0</version>
+ <version>0.3.1-SNAPSHOT</version>
</dependency>
<!-- OSGi -->
<dependency>
diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index ec31888..3c16c6a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -138,7 +138,9 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
Collection<TopicPartition> topicPartitions = singleton(topicPartition);
consumer.assign(topicPartitions);
if (assign != null) {
- consumer.seek(topicPartition, offset(assign));
+ AssignDetails assignDetails = new AssignDetails(assign);
+ long offset = assignDetails.getOffset(consumer, topicPartition);
+ consumer.seek(topicPartition, offset);
} else if (reset == Reset.earliest) {
consumer.seekToBeginning(topicPartitions);
} else {
@@ -181,6 +183,11 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
public String assignTo(long offset) {
return format("%s:%s", PARTITION, offset);
}
+
+ @Override
+ public String assignTo(Reset reset, long relativeOffset) {
+ return format("%s:%s:%d", PARTITION, reset.name(), relativeOffset);
+ }
protected <T> KafkaConsumer<String, T> createConsumer(Class<? extends Deserializer<?>> deserializer, Reset reset) {
String groupId = UUID.randomUUID().toString();
@@ -236,16 +243,38 @@ public class KafkaClientProvider implements MessagingProvider, Closeable {
return config;
}
- private long offset(String assign) {
- String[] chunks = assign.split(":");
- if (chunks.length != 2) {
- throw new IllegalArgumentException(format("Illegal assign %s", assign));
- }
- return Long.parseLong(chunks[1]);
- }
-
@Override
public URI getServerUri() {
return serverUri;
}
+
+ static class AssignDetails {
+ private final Reset reset;
+ private final long offset;
+
+ AssignDetails(String assign) {
+ String[] chunks = assign.split(":");
+ if (chunks.length == 3) {
+ String reset = chunks[1];
+ this.reset = Reset.valueOf(reset);
+ offset = Long.parseLong(chunks[2]);
+ } else if (chunks.length == 2) {
+ reset = null;
+ offset = Long.parseLong(chunks[1]);
+ } else {
+ throw new IllegalArgumentException(format("Illegal assign %s", assign));
+ }
+ }
+
+ long getOffset(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
+ Collection<TopicPartition> partitions = singleton(topicPartition);
+ if (reset == Reset.earliest) {
+ return consumer.beginningOffsets(partitions).get(topicPartition) + offset;
+ } else if (reset == Reset.latest) {
+ return consumer.endOffsets(partitions).get(topicPartition) + offset;
+ } else {
+ return offset;
+ }
+ }
+ }
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
index 358a760..738c175 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaClientProviderTest.java
@@ -21,10 +21,11 @@ package org.apache.sling.distribution.journal.kafka;
import static java.util.Collections.emptyMap;
import static org.apache.sling.distribution.journal.kafka.util.KafkaEndpointBuilder.buildKafkaEndpoint;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
+import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -33,6 +34,7 @@ import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.Reset;
import org.junit.Assert;
@@ -116,4 +118,22 @@ public class KafkaClientProviderTest {
String assign = provider.assignTo(1l);
assertThat(assign, equalTo("0:1"));
}
+
+ @Test
+ public void testAssignToRelative() throws Exception {
+ String assign = provider.assignTo(Reset.latest, -1l);
+ assertThat(assign, equalTo("0:latest:-1"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidAssign() throws Exception {
+ HandlerAdapter<?> handler = Mockito.mock(HandlerAdapter.class);
+ provider.createPoller(TOPIC, Reset.latest, "", handler);
+ }
+
+ @Test
+ public void testGeServerURI() throws Exception {
+ URI serverUri = provider.getServerUri();
+ assertThat(serverUri.toString(), equalTo("localhost:9092"));
+ }
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
index 097919e..a3dec00 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfoTest.java
@@ -19,7 +19,7 @@
package org.apache.sling.distribution.journal.kafka;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
import org.apache.kafka.clients.consumer.ConsumerRecord;
diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index 0989667..e12c420 100644
--- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.sling.distribution.journal.kafka;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.Closeable;
@@ -112,6 +112,39 @@ public class MessagingTest {
assertReceived("Should see message as we fall back to earliest");
}
}
+
+
+ @Test
+ public void testAssignRelativeLatest() throws Exception {
+ DiscoveryMessage msg = createMessage();
+ MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName);
+ messageSender.send(msg);
+
+ String assign1 = provider.assignTo(Reset.latest, -1);
+ try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) {
+ assertReceived("Starting from latest:-1 .. should see our message");
+ }
+ String assign2 = provider.assignTo(Reset.latest, 0);
+ try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) {
+ assertNotReceived("Should not see message as we fall back to latest");
+ }
+ }
+
+ @Test
+ public void testAssignRelativeEarliest() throws Exception {
+ DiscoveryMessage msg = createMessage();
+ MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName);
+ messageSender.send(msg);
+
+ String assign1 = provider.assignTo(Reset.earliest, 0);
+ try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign1, handler)) {
+ assertReceived("Starting from latest:-1 .. should see our message");
+ }
+ String assign2 = provider.assignTo(Reset.earliest, 1);
+ try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, assign2, handler)) {
+ assertNotReceived("Should not see message as we fall back to latest");
+ }
+ }
private DiscoveryMessage createMessage() {
return DiscoveryMessage.builder()