You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/22 12:18:40 UTC

[camel] branch main updated: CAMEL-18717: do save to offset repository when using a commit manager

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a9a1c650d5e CAMEL-18717: do save to offset repository when using a commit manager
a9a1c650d5e is described below

commit a9a1c650d5e980637400c5579a359fe62d8662ca
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Nov 21 15:09:44 2022 +0100

    CAMEL-18717: do save to offset repository when using a commit manager
---
 .../kafka/consumer/AbstractCommitManager.java      | 20 ++++++
 .../kafka/consumer/AsyncCommitManager.java         | 18 ++++-
 .../kafka/consumer/CommitToOffsetManager.java      | 20 ------
 .../kafka/consumer/SyncCommitManager.java          | 12 +++-
 .../integration/BaseManualCommitTestSupport.java   | 80 ++++++++++++++++-----
 .../KafkaConsumerAsyncWithOffsetRepoCommitIT.java  | 81 ++++++++++++++++++++++
 .../KafkaConsumerSyncWithOffsetRepoCommitIT.java   | 72 +++++++++++++++++++
 7 files changed, 265 insertions(+), 38 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
index 6b1b3dd4d5d..fe5abc3e403 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AbstractCommitManager.java
@@ -91,4 +91,24 @@ public abstract class AbstractCommitManager implements CommitManager {
                 Duration.ofMillis(timeout));
     }
 
+    protected void saveStateToOffsetRepository(
+            TopicPartition partition, long partitionLastOffset,
+            StateRepository<String, String> offsetRepository) {
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
+                    partition.partition(),
+                    partitionLastOffset);
+        }
+        offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset));
+    }
+
+    protected static String serializeOffsetKey(TopicPartition topicPartition) {
+        return topicPartition.topic() + '/' + topicPartition.partition();
+    }
+
+    protected static String serializeOffsetValue(long offset) {
+        return String.valueOf(offset);
+    }
+
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
index d05778bdaa9..7e117ee4d61 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/AsyncCommitManager.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -34,11 +35,14 @@ public class AsyncCommitManager extends AbstractCommitManager {
     private final Consumer<?, ?> consumer;
 
     private final OffsetCache offsetCache = new OffsetCache();
+    private final StateRepository<String, String> offsetRepository;
 
     public AsyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
         super(consumer, kafkaConsumer, threadId, printableTopic);
 
         this.consumer = consumer;
+
+        offsetRepository = configuration.getOffsetRepository();
     }
 
     @Override
@@ -66,7 +70,7 @@ public class AsyncCommitManager extends AbstractCommitManager {
 
         final Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataMap
                 = Collections.singletonMap(partition, new OffsetAndMetadata(partitionLastOffset + 1));
-        consumer.commitAsync(topicPartitionOffsetAndMetadataMap, offsetCache::removeCommittedEntries);
+        consumer.commitAsync(topicPartitionOffsetAndMetadataMap, this::postCommitCallback);
     }
 
     @Override
@@ -85,4 +89,16 @@ public class AsyncCommitManager extends AbstractCommitManager {
     public void recordOffset(TopicPartition partition, long partitionLastOffset) {
         offsetCache.recordOffset(partition, partitionLastOffset);
     }
+
+    private void postCommitCallback(Map<TopicPartition, OffsetAndMetadata> committed, Exception exception) {
+        if (exception == null) {
+            if (offsetRepository != null) {
+                for (var entry : committed.entrySet()) {
+                    saveStateToOffsetRepository(entry.getKey(), entry.getValue().offset(), offsetRepository);
+                }
+            }
+        }
+
+        offsetCache.removeCommittedEntries(committed, exception);
+    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
index 1a073dce22b..ba709e82a7e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/CommitToOffsetManager.java
@@ -50,31 +50,11 @@ public class CommitToOffsetManager extends AbstractCommitManager {
         saveStateToOffsetRepository(partition, partitionLastOffset, offsetRepository);
     }
 
-    private void saveStateToOffsetRepository(
-            TopicPartition partition, long partitionLastOffset,
-            StateRepository<String, String> offsetRepository) {
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Saving offset repository state {} [topic: {} partition: {} offset: {}]", threadId, partition.topic(),
-                    partition.partition(),
-                    partitionLastOffset);
-        }
-        offsetRepository.setState(serializeOffsetKey(partition), serializeOffsetValue(partitionLastOffset));
-    }
-
     @Override
     public void commit() {
         // NO-OP ... commits to offset only
     }
 
-    private static String serializeOffsetKey(TopicPartition topicPartition) {
-        return topicPartition.topic() + '/' + topicPartition.partition();
-    }
-
-    private static String serializeOffsetValue(long offset) {
-        return String.valueOf(offset);
-    }
-
     @Override
     public void recordOffset(TopicPartition partition, long partitionLastOffset) {
         if (partitionLastOffset == START_OFFSET) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
index 45b779392e7..a494a8b10a6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/SyncCommitManager.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.Map;
 
 import org.apache.camel.component.kafka.KafkaConsumer;
+import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
@@ -33,11 +34,14 @@ public class SyncCommitManager extends AbstractCommitManager {
 
     private final OffsetCache offsetCache = new OffsetCache();
     private final Consumer<?, ?> consumer;
+    private final StateRepository<String, String> offsetRepository;
 
     public SyncCommitManager(Consumer<?, ?> consumer, KafkaConsumer kafkaConsumer, String threadId, String printableTopic) {
         super(consumer, kafkaConsumer, threadId, printableTopic);
 
         this.consumer = consumer;
+
+        offsetRepository = configuration.getOffsetRepository();
     }
 
     @Override
@@ -61,11 +65,17 @@ public class SyncCommitManager extends AbstractCommitManager {
             return;
         }
 
+        final long lastOffset = offset + 1;
+
         final Map<TopicPartition, OffsetAndMetadata> offsets
-                = Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1));
+                = Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset));
         long timeout = configuration.getCommitTimeoutMs();
         consumer.commitSync(offsets, Duration.ofMillis(timeout));
 
+        if (offsetRepository != null) {
+            saveStateToOffsetRepository(partition, lastOffset, offsetRepository);
+        }
+
         offsetCache.removeCommittedEntries(offsets, null);
     }
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
index 28b8e0bf3ee..ffbadfc6028 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/BaseManualCommitTestSupport.java
@@ -23,9 +23,14 @@ import java.util.Properties;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.component.kafka.KafkaConstants;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.spi.StateRepository;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeEach;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
 
     @EndpointInject("mock:result")
@@ -51,17 +56,9 @@ public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
     }
 
     public void kafkaManualCommitTest(String topic) throws Exception {
-        to.expectedMessageCount(5);
-        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
-        // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use
-        // manual commit
-        to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
+        setupPreExecutionExpectations();
 
-        for (int k = 0; k < 5; k++) {
-            String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
-            producer.send(data);
-        }
+        sendRecords(0, 5, topic);
 
         to.assertIsSatisfied(3000);
 
@@ -72,11 +69,7 @@ public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
         to.expectedMessageCount(0);
 
         // Third step: While our route is stopped, we send 3 records more to Kafka test topic
-        for (int k = 5; k < 8; k++) {
-            String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
-            producer.send(data);
-        }
+        sendRecords(5, 8, topic);
 
         to.assertIsSatisfied(3000);
 
@@ -85,9 +78,64 @@ public class BaseManualCommitTestSupport extends BaseEmbeddedKafkaTestSupport {
         // Fourth step: We start again our route, since we have been committing the offsets from the first step,
         // we will expect to consume from the latest committed offset e.g from offset 5
         context.getRouteController().startRoute("foo");
+        setupPostExecutionExpectations();
+
+        to.assertIsSatisfied(3000);
+    }
+
+    public void kafkaManualCommitTestWithStateRepository(String topic, StateRepository<String, String> stateRepository)
+            throws Exception {
+        setupPreExecutionExpectations();
+
+        sendRecords(0, 5, topic);
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        final String state = Awaitility.await().until(() -> stateRepository.getState(topic + "/0"),
+                Matchers.notNullValue());
+        // We send 5 records initially, so we expect the offset to be 5 after first step execution
+        assertEquals("5", state, "5 messages were sent in the first step, therefore the offset should be 5");
+
+        // Second step: We shut down our route, we expect nothing will be recovered by our route
+        context.getRouteController().stopRoute("foo");
+        to.expectedMessageCount(0);
+
+        // Third step: While our route is stopped, we send 3 records more to Kafka test topic
+        sendRecords(5, 8, topic);
+
+        to.assertIsSatisfied(3000);
+
+        to.reset();
+
+        // Fourth step: We start again our route, since we have been committing the offsets from the first step,
+        // we will expect to consume from the latest committed offset e.g from offset 5
+        context.getRouteController().startRoute("foo");
+        setupPostExecutionExpectations();
+
+        to.assertIsSatisfied(3000);
+    }
+
+    protected void setupPostExecutionExpectations() {
         to.expectedMessageCount(3);
         to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");
+    }
 
-        to.assertIsSatisfied(3000);
+    protected void sendRecords(int startIndex, int lastIndex, String topic) {
+        for (int position = startIndex; position < lastIndex; position++) {
+            String msg = "message-" + position;
+            ProducerRecord<String, String> data = new ProducerRecord<>(topic, "1", msg);
+            producer.send(data);
+        }
+    }
+
+    protected void setupPreExecutionExpectations() {
+        to.expectedMessageCount(5);
+        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
+
+        // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use
+        // manual commit
+        to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
     }
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncWithOffsetRepoCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncWithOffsetRepoCommitIT.java
new file mode 100644
index 00000000000..dcdc9e990d5
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerAsyncWithOffsetRepoCommitIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration.commit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.integration.BaseManualCommitTestSupport;
+import org.apache.camel.impl.engine.MemoryStateRepository;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class KafkaConsumerAsyncWithOffsetRepoCommitIT extends BaseManualCommitTestSupport {
+
+    public static final String TOPIC = "testAsyncCommitWithOffsetRepoTest";
+
+    @BindToRegistry("stateRepository")
+    private final MemoryStateRepository stateRepository = new MemoryStateRepository();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaConsumerAsyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false&offsetRepository=#bean:stateRepository"
+                    + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualAsyncCommitFactory")
+    private Endpoint from;
+
+    public static MemoryStateRepository offsetRepo() {
+        MemoryStateRepository stateRepository = new MemoryStateRepository();
+        stateRepository.setState(TOPIC + "/0", "");
+        return stateRepository;
+    }
+
+    @AfterEach
+    public void after() {
+        cleanupKafka(TOPIC);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from)
+                        .routeId("foo").to(to).process(e -> {
+                            KafkaManualCommit manual
+                                    = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+
+                            assertNotNull(manual);
+                            manual.commit();
+                        });
+                from(from).routeId("bar").autoStartup(false).to(toBar);
+            }
+        };
+    }
+
+    @DisplayName("Tests that the offset repository gets updated when using in conjunction with the Async commit manager")
+    @Test
+    public void kafkaManualCommitWithOffsetRepo() throws Exception {
+        kafkaManualCommitTestWithStateRepository(TOPIC, stateRepository);
+    }
+
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncWithOffsetRepoCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncWithOffsetRepoCommitIT.java
new file mode 100644
index 00000000000..159d1c9499e
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncWithOffsetRepoCommitIT.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration.commit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.integration.BaseManualCommitTestSupport;
+import org.apache.camel.impl.engine.MemoryStateRepository;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class KafkaConsumerSyncWithOffsetRepoCommitIT extends BaseManualCommitTestSupport {
+
+    public static final String TOPIC = "testManualCommitSyncWithOffsetRepoTest";
+
+    @BindToRegistry("stateRepository")
+    private final MemoryStateRepository stateRepository = new MemoryStateRepository();
+
+    @EndpointInject("kafka:" + TOPIC
+                    + "?groupId=KafkaConsumerSyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false&offsetRepository=#bean:stateRepository"
+                    + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
+    private Endpoint from;
+
+    @AfterEach
+    public void after() {
+        cleanupKafka(TOPIC);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(from).routeId("foo").to(to).process(e -> {
+                    KafkaManualCommit manual = e.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+                    assertNotNull(manual);
+                    manual.commit();
+                });
+                from(from).routeId("bar").autoStartup(false).to(toBar);
+            }
+        };
+    }
+
+    @DisplayName("Tests that the offset repository gets updated when using in conjunction with the Sync commit manager")
+    @Test
+    public void kafkaManualCommitWithOffsetRepo() throws Exception {
+        kafkaManualCommitTestWithStateRepository(TOPIC, stateRepository);
+    }
+
+}