You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/07 21:35:57 UTC
[camel] branch main updated: CAMEL-18688: add full resume support for Kafka
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 59046fe140f CAMEL-18688: add full resume support for Kafka
59046fe140f is described below
commit 59046fe140ff0278dd5a243a4b61e4220b3eee20
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Nov 4 16:07:40 2022 +0100
CAMEL-18688: add full resume support for Kafka
---
.../org/apache/camel/kafka-adapter-factory | 2 +
.../camel/component/kafka/KafkaConsumer.java | 5 +
.../camel/component/kafka/KafkaFetchRecords.java | 19 +-
.../consumer/support/ResumeStrategyFactory.java | 92 --------
.../support/classic/AssignmentAdapterHelper.java | 49 +++++
.../ClassicRebalanceListener.java} | 31 +--
.../classic/NoOpPartitionAssignmentAdapter.java | 38 ++++
.../OffsetPartitionAssignmentAdapter.java} | 16 +-
.../PartitionAssignmentAdapter.java} | 13 +-
.../SeekPolicyPartitionAssignmentAdapter.java} | 15 +-
.../consumer/support/resume/KafkaResumable.java | 62 ++++++
.../support/resume/KafkaResumeAdapter.java | 113 ++++++++++
.../ResumeRebalanceListener.java} | 32 ++-
.../kafka/SingleNodeKafkaResumeStrategy.java | 5 +-
...KafkaConsumerAutoInstResumeRouteStrategyIT.java | 113 ++++++++++
.../KafkaConsumerWithResumeRouteStrategyIT.java | 239 ---------------------
.../processor/resume/TransientResumeStrategy.java | 113 +++++-----
17 files changed, 493 insertions(+), 464 deletions(-)
diff --git a/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory
new file mode 100644
index 00000000000..10d19f83eab
--- /dev/null
+++ b/components/camel-kafka/src/generated/resources/META-INF/services/org/apache/camel/kafka-adapter-factory
@@ -0,0 +1,2 @@
+# Generated by camel build tools - do NOT edit this file!
+class=org.apache.camel.component.kafka.consumer.support.resume.KafkaResumeAdapter
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index e1e29660b5c..dad29c73ad5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -238,4 +238,9 @@ public class KafkaConsumer extends DefaultConsumer
public List<TaskHealthState> healthStates() {
return tasks.stream().map(t -> t.healthState()).collect(Collectors.toList());
}
+
+ @Override
+ public String adapterFactoryService() {
+ return "kafka-adapter-factory";
+ }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index f94e3e118a5..b31a3898b5a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -27,11 +27,10 @@ import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.CommitManagers;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaErrorStrategies;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
-import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
-import org.apache.camel.component.kafka.consumer.support.ResumeStrategyFactory;
+import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
+import org.apache.camel.component.kafka.consumer.support.resume.ResumeRebalanceListener;
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
import org.apache.camel.support.task.ForegroundTask;
import org.apache.camel.support.task.Tasks;
@@ -40,6 +39,7 @@ import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ReflectionHelper;
import org.apache.camel.util.TimeUtils;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
@@ -272,11 +272,16 @@ public class KafkaFetchRecords implements Runnable {
}
private void subscribe() {
- KafkaConsumerResumeAdapter resumeStrategy = ResumeStrategyFactory.resolveResumeAdapter(kafkaConsumer);
- resumeStrategy.setConsumer(consumer);
+ ConsumerRebalanceListener listener;
- PartitionAssignmentListener listener = new PartitionAssignmentListener(
- threadId, kafkaConsumer.getEndpoint().getConfiguration(), commitManager, resumeStrategy);
+ if (kafkaConsumer.getResumeStrategy() == null) {
+ listener = new ClassicRebalanceListener(
+ threadId, kafkaConsumer.getEndpoint().getConfiguration(), commitManager, consumer);
+ } else {
+ listener = new ResumeRebalanceListener(
+ threadId, kafkaConsumer.getEndpoint().getConfiguration(),
+ commitManager, consumer, kafkaConsumer.getResumeStrategy());
+ }
if (LOG.isInfoEnabled()) {
LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
deleted file mode 100644
index 77315f2e85b..00000000000
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.consumer.support;
-
-import org.apache.camel.component.kafka.KafkaConfiguration;
-import org.apache.camel.component.kafka.KafkaConsumer;
-import org.apache.camel.component.kafka.SeekPolicy;
-import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.spi.StateRepository;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class ResumeStrategyFactory {
-
- /**
- * A NO-OP resume strategy that does nothing (i.e.: no resume)
- */
- private static class NoOpKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
-
- @SuppressWarnings("unused")
- @Override
- public void setConsumer(Consumer<?, ?> consumer) {
- // NO-OP
- }
-
- @Override
- public void setKafkaResumable(KafkaResumable kafkaResumable) {
- // NO-OP
- }
-
- @SuppressWarnings("unused")
- @Override
- public void resume() {
-
- }
- }
-
- private static final NoOpKafkaConsumerResumeAdapter NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeAdapter();
- private static final Logger LOG = LoggerFactory.getLogger(ResumeStrategyFactory.class);
-
- private ResumeStrategyFactory() {
- }
-
- public static KafkaConsumerResumeAdapter resolveResumeAdapter(KafkaConsumer kafkaConsumer) {
- // When using resumable routes, which register the strategy via service, it takes priority over everything else
- ResumeStrategy resumeStrategy = kafkaConsumer.getResumeStrategy();
- if (resumeStrategy != null) {
- KafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(KafkaConsumerResumeAdapter.class);
-
- // The strategy should not be able to be created without an adapter, but let's be safe
- assert adapter != null;
-
- return adapter;
- }
-
- KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration();
-
- return resolveBuiltinResumeAdapters(configuration);
- }
-
- private static KafkaConsumerResumeAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) {
- LOG.debug("No resume strategy was provided ... checking for built-ins ...");
- StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
- SeekPolicy seekTo = configuration.getSeekTo();
-
- if (offsetRepository != null) {
- LOG.info("Using resume from offset strategy");
- return new OffsetKafkaConsumerResumeAdapter(offsetRepository);
- } else if (seekTo != null) {
- LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo);
- return new SeekPolicyKafkaConsumerResumeAdapter(seekTo);
- }
-
- LOG.info("Using NO-OP resume strategy");
- return NO_OP_RESUME_STRATEGY;
- }
-}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java
new file mode 100644
index 00000000000..082e53bdfef
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/AssignmentAdapterHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.consumer.support.classic;
+
+import org.apache.camel.component.kafka.KafkaConfiguration;
+import org.apache.camel.component.kafka.SeekPolicy;
+import org.apache.camel.spi.StateRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class AssignmentAdapterHelper {
+
+ private static final NoOpPartitionAssignmentAdapter NO_OP_ASSIGNMENT_ADAPTER = new NoOpPartitionAssignmentAdapter();
+ private static final Logger LOG = LoggerFactory.getLogger(AssignmentAdapterHelper.class);
+
+ private AssignmentAdapterHelper() {
+ }
+
+ public static PartitionAssignmentAdapter resolveBuiltinResumeAdapters(KafkaConfiguration configuration) {
+ LOG.debug("No resume strategy was provided ... checking for built-ins ...");
+ StateRepository<String, String> offsetRepository = configuration.getOffsetRepository();
+ SeekPolicy seekTo = configuration.getSeekTo();
+
+ if (offsetRepository != null) {
+ LOG.info("Using resume from offset strategy");
+ return new OffsetPartitionAssignmentAdapter(offsetRepository);
+ } else if (seekTo != null) {
+ LOG.info("Using resume from seek policy strategy with seeking from {}", seekTo);
+ return new SeekPolicyPartitionAssignmentAdapter(seekTo);
+ }
+
+ LOG.info("Using NO-OP resume strategy");
+ return NO_OP_ASSIGNMENT_ADAPTER;
+ }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
similarity index 67%
copy from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
copy to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
index 03046f7a10b..ba11d1a7657 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/ClassicRebalanceListener.java
@@ -15,34 +15,34 @@
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PartitionAssignmentListener implements ConsumerRebalanceListener {
- private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class);
+public class ClassicRebalanceListener implements ConsumerRebalanceListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ClassicRebalanceListener.class);
private final String threadId;
private final KafkaConfiguration configuration;
- private final KafkaConsumerResumeAdapter resumeStrategy;
+ private final PartitionAssignmentAdapter assignmentAdapter;
private final CommitManager commitManager;
- public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
- CommitManager commitManager,
- KafkaConsumerResumeAdapter resumeStrategy) {
+ public ClassicRebalanceListener(String threadId, KafkaConfiguration configuration,
+ CommitManager commitManager, Consumer<?, ?> consumer) {
this.threadId = threadId;
this.configuration = configuration;
this.commitManager = commitManager;
- this.resumeStrategy = resumeStrategy;
+
+ assignmentAdapter = AssignmentAdapterHelper.resolveBuiltinResumeAdapters(configuration);
+ assignmentAdapter.setConsumer(consumer);
}
@Override
@@ -59,19 +59,10 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-
if (LOG.isDebugEnabled()) {
partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic()));
-
}
- List<KafkaResumable> resumables = partitions.stream()
- .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList());
-
- resumables.forEach(this::doResume);
- }
- private void doResume(KafkaResumable r) {
- resumeStrategy.setKafkaResumable(r);
- resumeStrategy.resume();
+ assignmentAdapter.handlePartitionAssignment();
}
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java
new file mode 100644
index 00000000000..b97f7689e82
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/NoOpPartitionAssignmentAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.classic;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+/**
+ * A NO-OP resume strategy that does nothing (i.e.: no resume)
+ */
+public class NoOpPartitionAssignmentAdapter implements PartitionAssignmentAdapter {
+
+ @SuppressWarnings("unused")
+ @Override
+ public void setConsumer(Consumer<?, ?> consumer) {
+ // NO-OP
+ }
+
+ @SuppressWarnings("unused")
+ @Override
+ public void handlePartitionAssignment() {
+
+ }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
similarity index 86%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
index 3ef4f209c43..4227ad7f004 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/OffsetPartitionAssignmentAdapter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
import java.util.Set;
@@ -30,14 +30,14 @@ import static org.apache.camel.component.kafka.consumer.support.KafkaRecordProce
/**
* A resume strategy that uses Kafka's offset for resuming
*/
-public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
+public class OffsetPartitionAssignmentAdapter implements PartitionAssignmentAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(OffsetKafkaConsumerResumeAdapter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(OffsetPartitionAssignmentAdapter.class);
private final StateRepository<String, String> offsetRepository;
private Consumer<?, ?> consumer;
- public OffsetKafkaConsumerResumeAdapter(StateRepository<String, String> offsetRepository) {
+ public OffsetPartitionAssignmentAdapter(StateRepository<String, String> offsetRepository) {
this.offsetRepository = offsetRepository;
}
@@ -46,11 +46,6 @@ public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdap
this.consumer = consumer;
}
- @Override
- public void setKafkaResumable(KafkaResumable kafkaResumable) {
- // NO-OP
- }
-
private void resumeFromOffset(final Consumer<?, ?> consumer, TopicPartition topicPartition, String offsetState) {
// The state contains the last read offset, so you need to seek from the next one
long offset = deserializeOffsetValue(offsetState) + 1;
@@ -58,8 +53,7 @@ public class OffsetKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdap
consumer.seek(topicPartition, offset);
}
- @Override
- public void resume() {
+ public void handlePartitionAssignment() {
Set<TopicPartition> assignments = consumer.assignment();
for (TopicPartition topicPartition : assignments) {
String offsetState = offsetRepository.getState(serializeOffsetKey(topicPartition));
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
similarity index 75%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
index d25c05662cb..3c6a92d2542 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/PartitionAssignmentAdapter.java
@@ -14,9 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
-import org.apache.camel.resume.ResumeAdapter;
import org.apache.kafka.clients.consumer.Consumer;
/**
@@ -27,7 +26,7 @@ import org.apache.kafka.clients.consumer.Consumer;
* the component is set up to use more than one of them. As such, implementations are responsible for ensuring the
* thread-safety of the operations within the resume method.
*/
-public interface KafkaConsumerResumeAdapter extends ResumeAdapter {
+public interface PartitionAssignmentAdapter {
/**
* Sets the Kafka consumer instance for the adapter. Please note that the Kafka consumer is not safe for concurrent
@@ -37,11 +36,5 @@ public interface KafkaConsumerResumeAdapter extends ResumeAdapter {
*/
void setConsumer(Consumer<?, ?> consumer);
- /**
- * Sets an optional resumable instance for the adapter. This is usually set during partition assignment. Garanteed
- * not to be null and safe to ignore if partition and topic information are not used.
- *
- * @param kafkaResumable the resumable instance
- */
- void setKafkaResumable(KafkaResumable kafkaResumable);
+ void handlePartitionAssignment();
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java
similarity index 81%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java
index 0dee4e1b849..b97839199d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeAdapter.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/classic/SeekPolicyPartitionAssignmentAdapter.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.classic;
import org.apache.camel.component.kafka.SeekPolicy;
import org.apache.kafka.clients.consumer.Consumer;
@@ -24,14 +24,14 @@ import org.slf4j.LoggerFactory;
/**
* A resume strategy that uses Camel's seekTo configuration for resuming
*/
-public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
+public class SeekPolicyPartitionAssignmentAdapter implements PartitionAssignmentAdapter {
- private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyKafkaConsumerResumeAdapter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SeekPolicyPartitionAssignmentAdapter.class);
private final SeekPolicy seekPolicy;
private Consumer<?, ?> consumer;
- public SeekPolicyKafkaConsumerResumeAdapter(SeekPolicy seekPolicy) {
+ public SeekPolicyPartitionAssignmentAdapter(SeekPolicy seekPolicy) {
this.seekPolicy = seekPolicy;
}
@@ -41,12 +41,7 @@ public class SeekPolicyKafkaConsumerResumeAdapter implements KafkaConsumerResume
}
@Override
- public void setKafkaResumable(KafkaResumable kafkaResumable) {
- // NO-OP
- }
-
- @Override
- public void resume() {
+ public void handlePartitionAssignment() {
if (seekPolicy == SeekPolicy.BEGINNING) {
LOG.debug("Seeking from the beginning of topic");
consumer.seekToBeginning(consumer.assignment());
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java
new file mode 100644
index 00000000000..bcbcf578d20
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.resume;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.Resumable;
+import org.apache.camel.support.resume.OffsetKeys;
+import org.apache.camel.support.resume.Offsets;
+
+public final class KafkaResumable implements Resumable {
+ private final String addressable;
+ private final Long offset;
+
+ private KafkaResumable(String addressable, Long offset) {
+ this.addressable = addressable;
+ this.offset = offset;
+ }
+
+ @Override
+ public OffsetKey<?> getOffsetKey() {
+ return OffsetKeys.unmodifiableOf(addressable);
+ }
+
+ @Override
+ public Offset<?> getLastOffset() {
+ return Offsets.of(offset);
+ }
+
+ /**
+ * Creates a new resumable for Kafka
+ *
+ * @param exchange the exchange to create the resumable from
+ * @return a new KafkaResumable instance with the data from the exchange
+ */
+ public static KafkaResumable of(Exchange exchange) {
+ String topic = exchange.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
+ Integer partition = exchange.getMessage().getHeader(KafkaConstants.PARTITION, Integer.class);
+ Long offset = exchange.getMessage().getHeader(KafkaConstants.OFFSET, Long.class);
+
+ String topicPartition = topic + "/" + partition;
+
+ return new KafkaResumable(topicPartition, offset);
+ }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java
new file mode 100644
index 00000000000..641e5b5af06
--- /dev/null
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/KafkaResumeAdapter.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.kafka.consumer.support.resume;
+
+import java.nio.ByteBuffer;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.spi.annotations.JdkService;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@JdkService("kafka-adapter-factory")
+public class KafkaResumeAdapter implements ResumeAdapter, Deserializable, Cacheable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeAdapter.class);
+
+ private Consumer<?, ?> consumer;
+ private ResumeCache<TopicPartition> resumeCache;
+
+ private boolean resume(TopicPartition topicPartition, Object value) {
+ consumer.seek(topicPartition, (Long) value);
+
+ return true;
+ }
+
+ @Override
+ public void resume() {
+ resumeCache.forEach(this::resume);
+ }
+
+ @Override
+ public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+ Object keyObj = deserializeKey(keyBuffer);
+ Object valueObj = deserializeValue(valueBuffer);
+
+ if (keyObj instanceof String) {
+ String key = (String) keyObj;
+
+ final String[] keyParts = key.split("/");
+ if (keyParts == null || keyParts.length != 2) {
+
+ String topic = keyParts[0];
+ int partition = Integer.parseInt(keyParts[1]);
+
+ if (valueObj instanceof Long) {
+ Long offset = (Long) valueObj;
+
+ resumeCache.add(new TopicPartition(topic, partition), offset);
+ } else {
+ LOG.warn("The type for the key '{}' is invalid: {}", key, valueObj);
+ }
+
+ } else {
+ LOG.warn("Unable to deserialize key '{}' because it has in invalid format and it will be discarded",
+ key);
+ }
+ } else {
+ LOG.warn("Unable to deserialize key '{}' because its type is invalid", keyObj);
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean add(OffsetKey<?> key, Offset<?> offset) {
+ Object keyObj = key.getValue();
+ Long valueObject = offset.getValue(Long.class);
+
+ if (keyObj instanceof TopicPartition) {
+ TopicPartition topicPartition = (TopicPartition) keyObj;
+
+ resumeCache.add(topicPartition, valueObject);
+ }
+
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setCache(ResumeCache<?> cache) {
+ this.resumeCache = (ResumeCache<TopicPartition>) cache;
+ }
+
+ @Override
+ public ResumeCache<?> getCache() {
+ return resumeCache;
+ }
+
+ public void setConsumer(Consumer<?, ?> consumer) {
+ this.consumer = consumer;
+ }
+}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
similarity index 67%
rename from components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
rename to components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
index 03046f7a10b..a7a47b72fa1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/resume/ResumeRebalanceListener.java
@@ -15,34 +15,36 @@
* limitations under the License.
*/
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.kafka.consumer.support.resume;
import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.consumer.CommitManager;
+import org.apache.camel.component.kafka.consumer.support.classic.ClassicRebalanceListener;
+import org.apache.camel.resume.ResumeStrategy;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PartitionAssignmentListener implements ConsumerRebalanceListener {
- private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class);
+public class ResumeRebalanceListener implements ConsumerRebalanceListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ClassicRebalanceListener.class);
private final String threadId;
private final KafkaConfiguration configuration;
- private final KafkaConsumerResumeAdapter resumeStrategy;
private final CommitManager commitManager;
+ private final KafkaResumeAdapter resumeAdapter;
- public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
- CommitManager commitManager,
- KafkaConsumerResumeAdapter resumeStrategy) {
+ public ResumeRebalanceListener(String threadId, KafkaConfiguration configuration,
+ CommitManager commitManager, Consumer<?, ?> consumer, ResumeStrategy resumeStrategy) {
this.threadId = threadId;
this.configuration = configuration;
this.commitManager = commitManager;
- this.resumeStrategy = resumeStrategy;
+
+ resumeAdapter = resumeStrategy.getAdapter(KafkaResumeAdapter.class);
+ resumeAdapter.setConsumer(consumer);
}
@Override
@@ -59,19 +61,11 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-
if (LOG.isDebugEnabled()) {
partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic()));
-
}
- List<KafkaResumable> resumables = partitions.stream()
- .map(p -> new KafkaResumable(String.valueOf(p.partition()), p.topic())).collect(Collectors.toList());
- resumables.forEach(this::doResume);
+ resumeAdapter.resume();
}
- private void doResume(KafkaResumable r) {
- resumeStrategy.setKafkaResumable(r);
- resumeStrategy.resume();
- }
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index c3863b6508e..32ee0a4af97 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -346,7 +346,10 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
@Override
public ResumeAdapter getAdapter() {
- waitForInitialization();
+ if (adapter == null) {
+ waitForInitialization();
+ }
+
return adapter;
}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
new file mode 100644
index 00000000000..17a69566ab2
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.consumer.support.resume.KafkaResumable;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaConsumerAutoInstResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAutoInstResumeRouteStrategyIT.class);
+ private static final String TOPIC = "resumable-route-auto";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ public static KafkaResumeStrategyConfigurationBuilder getDefaultKafkaResumeStrategyConfigurationBuilder() {
+ return KafkaResumeStrategyConfigurationBuilder.newBuilder()
+ .withBootstrapServers(service.getBootstrapServers())
+ .withTopic("resumable-route-auto-offsets")
+ .withResumeCache(TransientResumeStrategy.createSimpleCache())
+ .withProducerProperty("max.block.ms", "10000")
+ .withMaxInitializationDuration(Duration.ofSeconds(5))
+ .withProducerProperty("delivery.timeout.ms", "30000")
+ .withProducerProperty("session.timeout.ms", "15000")
+ .withProducerProperty("request.timeout.ms", "15000")
+ .withConsumerProperty("session.timeout.ms", "20000");
+ }
+
+ @BeforeEach
+ public void before() {
+ Properties props = getDefaultProperties();
+ KafkaProducer<Object, Object> producer = new KafkaProducer<>(props);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
+ }
+ }
+
+ @Override
+ protected void doPreSetup() throws Exception {
+ super.doPreSetup();
+
+ }
+
+ @Test
+ @Timeout(value = 30)
+ public void testOffsetIsBeingChecked() throws InterruptedException {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+
+ mock.expectedMessageCount(10);
+ mock.assertIsSatisfied();
+ }
+
+ @AfterEach
+ public void after() {
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+ }
+
+ public void process(Exchange exchange) {
+ exchange.getMessage().setHeader(Exchange.OFFSET, KafkaResumable.of(exchange));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ fromF("kafka:%s?groupId=%s_GROUP&autoCommitIntervalMs=1000"
+ + "&autoOffsetReset=earliest&consumersCount=1", TOPIC, TOPIC)
+ .resumable().configuration(getDefaultKafkaResumeStrategyConfigurationBuilder())
+ .process(e -> process(e))
+ .routeId("resume-strategy-auto-route")
+ // Note: this is for manually testing the ResumableCompletion onFailure exception logging. Uncomment it for testing it
+ // .process(e -> e.setException(new RuntimeCamelException("Mock error in test")))
+ .to("mock:sentMessages");
+
+ fromF("kafka:%s?groupId=%s_GROUP&autoCommitIntervalMs=1000", "resumable-route-auto-offsets",
+ "resumable-route-auto-offsets")
+ .to("mock:result");
+ }
+ };
+ }
+}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
deleted file mode 100644
index c6d903cd546..00000000000
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.integration;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.camel.BindToRegistry;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter;
-import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.OffsetKey;
-import org.apache.camel.resume.Resumable;
-import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.support.resume.Resumables;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class);
- private static final String TOPIC = "resumable-route-tp";
- private static final int RANDOM_VALUE = ThreadLocalRandom.current().nextInt(1, 1000);
-
- @EndpointInject("mock:result")
- private MockEndpoint result;
-
- @BindToRegistry("resumeStrategy")
- private TestUpdateStrategy resumeStrategy;
- private CountDownLatch messagesLatch;
-
- private static class TestUpdateStrategy extends TransientResumeStrategy {
- private final CountDownLatch messagesLatch;
- private boolean startCalled;
- private boolean offsetNull = true;
- private boolean offsetAddressableNull = true;
- private boolean offsetAddressableEmpty = true;
- private boolean offsetValueNull = true;
- private boolean offsetValueEmpty = true;
- private int lastOffset;
-
- public TestUpdateStrategy(ResumeAdapter resumeAdapter, CountDownLatch messagesLatch) {
- super(resumeAdapter);
-
- this.messagesLatch = messagesLatch;
- }
-
- @Override
- public void start() {
- LOG.warn("Start was called");
- startCalled = true;
- }
-
- @Override
- public void init() {
- LOG.warn("Init was called");
- }
-
- @Override
- public void updateLastOffset(Resumable offset) {
- try {
- if (offset != null) {
- offsetNull = false;
-
- OffsetKey<?> addressable = offset.getOffsetKey();
- if (addressable != null) {
- offsetAddressableNull = false;
- offsetAddressableEmpty = addressable.getValue() == null;
-
- }
-
- Offset<?> offsetValue = offset.getLastOffset();
- if (offsetValue != null) {
- offsetValueNull = false;
-
- if (offsetValue.getValue() != null) {
- offsetValueEmpty = false;
- lastOffset = (int) offsetValue.getValue();
- }
- }
- }
- } finally {
- messagesLatch.countDown();
- }
- }
-
- public boolean isOffsetNull() {
- return offsetNull;
- }
-
- public boolean isOffsetAddressableNull() {
- return offsetAddressableNull;
- }
-
- public boolean isOffsetValueNull() {
- return offsetValueNull;
- }
-
- public boolean isOffsetAddressableEmpty() {
- return offsetAddressableEmpty;
- }
-
- public boolean isOffsetValueEmpty() {
- return offsetValueEmpty;
- }
-
- public boolean isStartCalled() {
- return startCalled;
- }
- }
-
- private static class TestKafkaConsumerResumeAdapter implements KafkaConsumerResumeAdapter {
- private boolean resumeCalled;
- private boolean consumerIsNull = true;
-
- @Override
- public void setConsumer(Consumer<?, ?> consumer) {
- if (consumer != null) {
- consumerIsNull = false;
- }
- }
-
- @Override
- public void setKafkaResumable(KafkaResumable kafkaResumable) {
-
- }
-
- @Override
- public void resume() {
- resumeCalled = true;
- }
-
- public boolean isResumeCalled() {
- return resumeCalled;
- }
-
- public boolean isConsumerIsNull() {
- return consumerIsNull;
- }
- }
-
- @BeforeEach
- public void before() {
- Properties props = getDefaultProperties();
- KafkaProducer<Object, Object> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
-
- for (int i = 0; i < 10; i++) {
- producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
- }
- }
-
- @Override
- protected void doPreSetup() throws Exception {
- super.doPreSetup();
-
- messagesLatch = new CountDownLatch(1);
- resumeStrategy = new TestUpdateStrategy(new TestKafkaConsumerResumeAdapter(), messagesLatch);
- }
-
- @Test
- // @Timeout(value = 30)
- public void testOffsetIsBeingChecked() throws InterruptedException {
- assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called");
-
- final TestKafkaConsumerResumeAdapter adapter = resumeStrategy.getAdapter(TestKafkaConsumerResumeAdapter.class);
- assertNotNull(adapter, "The adapter should not be null");
- assertTrue(adapter.isResumeCalled(),
- "The resume strategy should have been called when the partition was assigned");
- assertFalse(adapter.isConsumerIsNull(),
- "The consumer passed to the strategy should not be null");
- assertTrue(resumeStrategy.isStartCalled(),
- "The resume strategy should have been started");
- assertFalse(resumeStrategy.isOffsetNull(),
- "The offset should not be null");
- assertFalse(resumeStrategy.isOffsetAddressableNull(),
- "The offset addressable should not be null");
- assertFalse(resumeStrategy.isOffsetAddressableEmpty(),
- "The offset addressable should not be empty");
- assertFalse(resumeStrategy.isOffsetValueNull(),
- "The offset value should not be null");
- assertFalse(resumeStrategy.isOffsetValueEmpty(),
- "The offset value should not be empty");
- assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets don't match");
- }
-
- @AfterEach
- public void after() {
- kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
- }
-
- @Override
- protected RouteBuilder createRouteBuilder() {
- return new RouteBuilder() {
- @Override
- public void configure() {
- from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP&autoCommitIntervalMs=1000"
- + "&autoOffsetReset=earliest&consumersCount=1")
- .resumable().resumeStrategy("resumeStrategy", "DEBUG")
- .routeId("resume-strategy-route")
- .setHeader(Exchange.OFFSET, constant(Resumables.of("key", RANDOM_VALUE)))
- // Note: this is for manually testing the ResumableCompletion onFailure exception logging. Uncomment it for testing it
- // .process(e -> e.setException(new RuntimeCamelException("Mock error in test")))
- .to("mock:result");
- }
- };
- }
-}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 0cfe5d86703..89ca17daf87 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -119,61 +119,7 @@ public class TransientResumeStrategy implements ResumeStrategy {
@Override
public ResumeCache<?> getResumeCache() {
- return new ResumeCache<>() {
- private Map<Object, Object> cache = new HashMap<>();
-
- @Override
- public Object computeIfAbsent(Object key, Function<? super Object, ? super Object> mapping) {
- return cache.computeIfAbsent(key, mapping);
- }
-
- @Override
- public Object computeIfPresent(
- Object key, BiFunction<? super Object, ? super Object, ? super Object> remapping) {
- return cache.computeIfPresent(key, remapping);
- }
-
- @Override
- public boolean contains(Object key, Object entry) {
- return Objects.equals(cache.get(key), entry);
- }
-
- @Override
- public void add(Object key, Object offsetValue) {
- cache.put(key, offsetValue);
- }
-
- @Override
- public boolean isFull() {
- return false;
- }
-
- @Override
- public long capacity() {
- return Integer.MAX_VALUE;
- }
-
- @Override
- public <T> T get(Object key, Class<T> clazz) {
- final Object o = cache.get(key);
-
- return clazz.cast(o);
- }
-
- @Override
- public Object get(Object key) {
- return cache.get(key);
- }
-
- @Override
- public void forEach(BiFunction<? super Object, ? super Object, Boolean> action) {
- for (Map.Entry e : cache.entrySet()) {
- if (!action.apply(e.getKey(), e.getValue())) {
- cache.remove(e.getKey());
- }
- }
- }
- };
+ return createSimpleCache();
}
@Override
@@ -184,4 +130,61 @@ public class TransientResumeStrategy implements ResumeStrategy {
}
};
}
+
+ public static ResumeCache<Object> createSimpleCache() {
+ return new ResumeCache<>() {
+ private Map<Object, Object> cache = new HashMap<>();
+
+ @Override
+ public Object computeIfAbsent(Object key, Function<? super Object, ? super Object> mapping) {
+ return cache.computeIfAbsent(key, mapping);
+ }
+
+ @Override
+ public Object computeIfPresent(Object key, BiFunction<? super Object, ? super Object, ? super Object> remapping) {
+ return cache.computeIfPresent(key, remapping);
+ }
+
+ @Override
+ public boolean contains(Object key, Object entry) {
+ return Objects.equals(cache.get(key), entry);
+ }
+
+ @Override
+ public void add(Object key, Object offsetValue) {
+ cache.put(key, offsetValue);
+ }
+
+ @Override
+ public boolean isFull() {
+ return false;
+ }
+
+ @Override
+ public long capacity() {
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public <T> T get(Object key, Class<T> clazz) {
+ final Object o = cache.get(key);
+
+ return clazz.cast(o);
+ }
+
+ @Override
+ public Object get(Object key) {
+ return cache.get(key);
+ }
+
+ @Override
+ public void forEach(BiFunction<? super Object, ? super Object, Boolean> action) {
+ for (Map.Entry e : cache.entrySet()) {
+ if (!action.apply(e.getKey(), e.getValue())) {
+ cache.remove(e.getKey());
+ }
+ }
+ }
+ };
+ }
}