You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/23 11:18:46 UTC
[camel] branch camel-3.18.x updated: CAMEL-18717: do save to offset repository when using a commit manager
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.18.x by this push:
new 8112c11a339 CAMEL-18717: do save to offset repository when using a commit manager
8112c11a339 is described below
commit 8112c11a339f0ec434d7dd77e7475c9c46630a4b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Nov 21 15:09:44 2022 +0100
CAMEL-18717: do save to offset repository when using a commit manager
---
.../kafka/consumer/AbstractCommitManager.java | 20 ++++++
.../kafka/consumer/AsyncCommitManager.java | 18 ++++-
.../kafka/consumer/CommitToOffsetManager.java | 20 ------
.../kafka/consumer/SyncCommitManager.java | 12 +++-
.../integration/BaseManualCommitTestSupport.java | 80 ++++++++++++++++-----
.../KafkaConsumerAsyncWithOffsetRepoCommitIT.java | 81 ++++++++++++++++++++++
.../KafkaConsumerSyncWithOffsetRepoCommitIT.java | 72 +++++++++++++++++++
7 files changed, 265 insertions(+), 38 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
index 6b1b3dd4d5d..fe5abc3e403 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
@@ -91,4 +91,24 @@ public abstract class AbstractCommitManager implements CommitManager {
Duration.ofMillis(timeout));
}
+ protected void saveStateToOffsetRepository(
+ TopicPartition partition, long partitionLastOffset,
+ StateRepository<String, String> offsetRepository) {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
+ partition.partition(),
+ partitionLastOffset);
+ }
+ offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset));
+ }
+
+ protected static String serializeOffsetKey(TopicPartition topicPartition) {
+ return topicPartition.topic() + '/' + topicPartition.partition();
+ }
+
+ protected static String serializeOffsetValue(long offset) {
+ return String.valueOf(offset);
+ }
+
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
index d05778bdaa9..7e117ee4d61 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -34,11 +35,14 @@ public class AsyncCommitManager extends AbstractCommitManager {
private final Consumer<?, ?> consumer;
private final OffsetCache offsetCache = new OffsetCache();
+ private final StateRepository<String, String> offsetRepository;
public AsyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
super(consumer, kafkaConsumer, threadId, printableTopic);
this.consumer = consumer;
+
+ offsetRepository = configuration.getOffsetRepository();
}
@Override
@@ -66,7 +70,7 @@ public class AsyncCommitManager extends AbstractCommitManager {
final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap
= Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1));
- consumer.commitAsync(topicPartitionOffsetAndMetadataMap, offsetCache::removeCommittedEntries);
+ consumer.commitAsync(topicPartitionOffsetAndMetadataMap, this::postCommitCallback);
}
@Override
@@ -85,4 +89,16 @@ public class AsyncCommitManager extends AbstractCommitManager {
public void recordOffset(TopicPartition partition, long partitionLastOffset) {
offsetCache.recordOffset(partition, partitionLastOffset);
}
+
+ private void postCommitCallback(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception) {
+ if (exception == null) {
+ if (offsetRepository != null) {
+ for (var entry : committed.entrySet()) {
+ saveStateToOffsetRepository(entry.getKey(), entry.getValue().offset(), offsetRepository);
+ }
+ }
+ }
+
+ offsetCache.removeCommittedEntries(committed, exception);
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
index 1a073dce22b..ba709e82a7e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
@@ -50,31 +50,11 @@ public class CommitToOffsetManager extends AbstractCommitManager {
saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository);
}
- private void saveStateToOffsetRepository(
- TopicPartition partition, long partitionLastOffset,
- StateRepository<String, String> offsetRepository) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
- partition.partition(),
- partitionLastOffset);
- }
- offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset));
- }
-
@Override
public void commit() {
// NO-OP ... commits to offset only
}
- private static String serializeOffsetKey(TopicPartition topicPartition) {
- return topicPartition.topic() + '/' + topicPartition.partition();
- }
-
- private static String serializeOffsetValue(long offset) {
- return String.valueOf(offset);
- }
-
@Override
public void recordOffset(TopicPartition partition, long partitionLastOffset) {
if (partitionLastOffset == START_OFFSET) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
index 45b779392e7..a494a8b10a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Map;
import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -33,11 +34,14 @@ public class SyncCommitManager extends AbstractCommitManager {
private final OffsetCache offsetCache = new OffsetCache();
private final Consumer<?, ?> consumer;
+ private final StateRepository<String, String> offsetRepository;
public SyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
super(consumer, kafkaConsumer, threadId, printableTopic);
this.consumer = consumer;
+
+ offsetRepository = configuration.getOffsetRepository();
}
@Override
@@ -61,11 +65,17 @@ public class SyncCommitManager extends AbstractCommitManager {
return;
}
+ final long lastOffset = offset + 1;
+
final Map<TopicPartition, OffsetAndMetadata> offsets
- = Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1));
+ = Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset));
long timeout = configuration.getCommitTimeoutMs();
consumer.commitSync(offsets, Duration.ofMillis(timeout));
+ if (offsetRepository != null) {
+ saveStateToOffsetRepository(partition, lastOffset, offsetRepository);
+ }
+
offsetCache.removeCommittedEntries(offsets, null);
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
index 28b8e0bf3ee..ffbadfc6028 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
@@ -23,9 +23,14 @@ import java.util.Properties;
import org.apache.camel.EndpointInject;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.BeforeEach;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
@EndpointInject("mock:result")
@@ -51,17 +56,9 @@ public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
}
public void kafkaManualCommitTest(String topic) throws Exception {
- to.expectedMessageCount(5);
- to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
- // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use
- // manual commit
- to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
+ setupPreExecutionExpectations();
- for (int k = 0; k < 5; k++) {
- String msg = "message-" + k;
- ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
- producer.send(data);
- }
+ sendRecords(0, 5, topic);
to.assertIsSatisfied(3000);
@@ -72,11 +69,7 @@ public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
to.expectedMessageCount(0);
// Third step: While our route is stopped, we send 3 records more to Kafka test topic
- for (int k = 5; k < 8; k++) {
- String msg = "message-" + k;
- ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
- producer.send(data);
- }
+ sendRecords(5, 8, topic);
to.assertIsSatisfied(3000);
@@ -85,9 +78,64 @@ public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
// Fourth step: We start again our route, since we have been committing the offsets from the first step,
// we will expect to consume from the latest committed offset e.g from offset 5
context.getRouteController().startRoute("foo");
+ setupPostExecutionExpectations();
+
+ to.assertIsSatisfied(3000);
+ }
+
+ public void kafkaManualCommitTestWithStateRepository(String topic, StateRepository<String, String> stateRepository)
+ throws Exception {
+ setupPreExecutionExpectations();
+
+ sendRecords(0, 5, topic);
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ final String state = Awaitility.await().until(() -> stateRepository.getState(topic + "/0"),
+ Matchers.notNullValue());
+ // We send 5 records initially, so we expect the offset to be 5 after first step execution
+ assertEquals("5", state, "5 messages were sent in the first step, therefore the offset should be 5");
+
+ // Second step: We shut down our route, we expect nothing will be recovered by our route
+ context.getRouteController().stopRoute("foo");
+ to.expectedMessageCount(0);
+
+ // Third step: While our route is stopped, we send 3 records more to Kafka test topic
+ sendRecords(5, 8, topic);
+
+ to.assertIsSatisfied(3000);
+
+ to.reset();
+
+ // Fourth step: We start again our route, since we have been committing the offsets from the first step,
+ // we will expect to consume from the latest committed offset e.g from offset 5
+ context.getRouteController().startRoute("foo");
+ setupPostExecutionExpectations();
+
+ to.assertIsSatisfied(3000);
+ }
+
+ protected void setupPostExecutionExpectations() {
to.expectedMessageCount(3);
to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");
+ }
- to.assertIsSatisfied(3000);
+ protected void sendRecords(int startIndex, int lastIndex, String topic) {
+ for (int position = startIndex; position < lastIndex; position++) {
+ String msg = "message-" + position;
+ ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
+ producer.send(data);
+ }
+ }
+
+ protected void setupPreExecutionExpectations() {
+ to.expectedMessageCount(5);
+ to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
+
+ // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use
+ // manual commit
+ to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
}
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncWithOffsetRepoCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncWithOffsetRepoCommitIT.java
new file mode 100644
index 00000000000..dcdc9e990d5
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncWithOffsetRepoCommitIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration.commit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.integration.BaseManualCommitTestSupport;
+import org.apache.camel.impl.engine.MemoryStateRepository;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class KafkaConsumerAsyncWithOffsetRepoCommitIT extends BaseManualCommitTestSupport {
+
+ public static final String TOPIC = "testAsyncCommitWithOffsetRepoTest";
+
+ @BindToRegistry("stateRepository")
+ private final MemoryStateRepository stateRepository = new MemoryStateRepository();
+
+ @EndpointInject("kafka:" + TOPIC
+ + "?groupId=KafkaConsumerAsyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false&offsetRepository=#bean:stateRepository"
+ + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory")
+ private Endpoint from;
+
+ public static MemoryStateRepository offsetRepo() {
+ MemoryStateRepository stateRepository = new MemoryStateRepository();
+ stateRepository.setState(TOPIC + "/0", "");
+ return stateRepository;
+ }
+
+ @AfterEach
+ public void after() {
+ cleanupKafka(TOPIC);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from(from)
+ .routeId("foo").to(to).process(e -> {
+ KafkaManualCommit manual
+ = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+
+ assertNotNull(manual);
+ manual.commit();
+ });
+ from(from).routeId("bar").autoStartup(false).to(toBar);
+ }
+ };
+ }
+
+ @DisplayName("Tests that the offset repository gets updated when using in conjunction with the Async commit manager")
+ @Test
+ public void kafkaManualCommitWithOffsetRepo() throws Exception {
+ kafkaManualCommitTestWithStateRepository(TOPIC, stateRepository);
+ }
+
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncWithOffsetRepoCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncWithOffsetRepoCommitIT.java
new file mode 100644
index 00000000000..159d1c9499e
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncWithOffsetRepoCommitIT.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration.commit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.integration.BaseManualCommitTestSupport;
+import org.apache.camel.impl.engine.MemoryStateRepository;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class KafkaConsumerSyncWithOffsetRepoCommitIT extends BaseManualCommitTestSupport {
+
+ public static final String TOPIC = "testManualCommitSyncWithOffsetRepoTest";
+
+ @BindToRegistry("stateRepository")
+ private final MemoryStateRepository stateRepository = new MemoryStateRepository();
+
+ @EndpointInject("kafka:" + TOPIC
+ + "?groupId=KafkaConsumerSyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false&offsetRepository=#bean:stateRepository"
+ + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+ private Endpoint from;
+
+ @AfterEach
+ public void after() {
+ cleanupKafka(TOPIC);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from(from).routeId("foo").to(to).process(e -> {
+ KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ assertNotNull(manual);
+ manual.commit();
+ });
+ from(from).routeId("bar").autoStartup(false).to(toBar);
+ }
+ };
+ }
+
+ @DisplayName("Tests that the offset repository gets updated when using in conjunction with the Sync commit manager")
+ @Test
+ public void kafkaManualCommitWithOffsetRepo() throws Exception {
+ kafkaManualCommitTestWithStateRepository(TOPIC, stateRepository);
+ }
+
+}