You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/10 17:26:37 UTC
[camel] branch main updated: CAMEL-18148: cleanup the offset update interfaces
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f7932e4ec60 CAMEL-18148: cleanup the offset update interfaces
f7932e4ec60 is described below
commit f7932e4ec604febdb393577cc5a6f9e5fd77c820
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Oct 10 13:45:28 2022 +0200
CAMEL-18148: cleanup the offset update interfaces
This should simplify further work to improve serialization and reliability
---
.../resume/kafka/KafkaResumeStrategy.java | 6 +---
.../kafka/SingleNodeKafkaResumeStrategy.java | 4 +--
.../KafkaConsumerWithResumeRouteStrategyIT.java | 4 +--
.../org/apache/camel/resume/ResumeStrategy.java | 8 +++++
.../resume/UpdatableConsumerResumeStrategy.java | 34 ----------------------
.../docs/modules/eips/pages/resume-strategies.adoc | 2 +-
.../processor/resume/ResumableCompletion.java | 16 ++++------
.../processor/resume/TransientResumeStrategy.java | 6 ++++
.../FileConsumerResumeFromOffsetStrategyTest.java | 12 ++------
9 files changed, 26 insertions(+), 66 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
index 9951e09303b..925ed2b2ba6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
@@ -17,15 +17,11 @@
package org.apache.camel.processor.resume.kafka;
-import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
/**
* Base interface for resume strategies that publish the offsets to a Kafka topic
- *
- * @param <T> the type of resumable
*/
-public interface KafkaResumeStrategy<T extends Resumable> extends UpdatableConsumerResumeStrategy<T>, ResumeStrategy {
+public interface KafkaResumeStrategy extends ResumeStrategy {
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 9a60cce760f..8689eeed96e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
* A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
* integrations.
*/
-public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> {
+public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
private Consumer<byte[], byte[]> consumer;
@@ -124,7 +124,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
}
@Override
- public void updateLastOffset(T offset) throws Exception {
+ public <T extends Resumable> void updateLastOffset(T offset) throws Exception {
OffsetKey<?> key = offset.getOffsetKey();
Offset<?> offsetValue = offset.getLastOffset();
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 0f7ab577cbf..c6d903cd546 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -34,7 +34,6 @@ import org.apache.camel.resume.Offset;
import org.apache.camel.resume.OffsetKey;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
import org.apache.camel.support.resume.Resumables;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -62,8 +61,7 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
private TestUpdateStrategy resumeStrategy;
private CountDownLatch messagesLatch;
- private static class TestUpdateStrategy extends TransientResumeStrategy
- implements UpdatableConsumerResumeStrategy<Resumable> {
+ private static class TestUpdateStrategy extends TransientResumeStrategy {
private final CountDownLatch messagesLatch;
private boolean startCalled;
private boolean offsetNull = true;
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 26039f712de..a9325b829b4 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -57,4 +57,12 @@ public interface ResumeStrategy extends Service {
default void loadCache() throws Exception {
}
+
+ /**
+ * Updates the last processed offset
+ *
+ * @param offset the offset to update
+ * @throws Exception if unable to update the offset
+ */
+ <T extends Resumable> void updateLastOffset(T offset) throws Exception;
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
deleted file mode 100644
index 52204789bc0..00000000000
--- a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.resume;
-
-/**
- * An updatable resume strategy
- *
- * @param <T> the type of the addressable value for the resumable object (for example, a file would use a Long value)
- */
-public interface UpdatableConsumerResumeStrategy<T extends Resumable> {
-
- /**
- * Updates the last processed offset
- *
- * @param offset the offset to update
- * @throws Exception if unable to update the offset
- */
- void updateLastOffset(T offset) throws Exception;
-}
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index e45a0f898d5..8328d99931c 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -93,7 +93,7 @@ available for each strategy. For instance, to configure either one of the Kafka
=== Implementing New Builtin Resume Strategies
-New builtin resume strategies can be created by implementing the `UpdatableConsumerResumeStrategy` and the `ResumeStrategy` interfaces. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details.
+New builtin resume strategies can be created by implementing the `ResumeStrategy` interface. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details.
== Local Cache Support
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index dfb75ee5b8f..35035215b59 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -21,7 +21,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ExchangeHelper;
@@ -57,17 +56,12 @@ public class ResumableCompletion implements Synchronization {
LOG.trace("Processing the resumable of type: {}", resumable.getLastOffset().getValue());
}
- if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {
- UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy
- = (UpdatableConsumerResumeStrategy) resumeStrategy;
- try {
- updatableConsumerResumeStrategy.updateLastOffset(resumable);
- } catch (Exception e) {
- LOG.error("Unable to update the offset: {}", e.getMessage(), e);
- }
- } else {
- LOG.debug("Cannot perform an offset update because the strategy is not updatable");
+ try {
+ resumeStrategy.updateLastOffset(resumable);
+ } catch (Exception e) {
+ LOG.error("Unable to update the offset: {}", e.getMessage(), e);
}
+
} else {
if (!intermittent) {
exchange.setException(new NoOffsetException(exchange));
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 205612b17e9..0e926152536 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor.resume;
+import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeStrategy;
@@ -46,6 +47,11 @@ public class TransientResumeStrategy implements ResumeStrategy {
return ResumeStrategy.super.getAdapter(clazz);
}
+ @Override
+ public <T extends Resumable> void updateLastOffset(T offset) {
+ // this is NO-OP
+ }
+
@Override
public void start() {
// this is NO-OP
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index e4828ec3b73..3eee355cbe5 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -32,8 +32,6 @@ import org.apache.camel.component.file.consumer.FileResumeAdapter;
import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.Resumable;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
import org.apache.camel.support.resume.Resumables;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
@@ -75,8 +73,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
}
private static class FailResumeAdapter
- implements FileResumeAdapter, DirectoryEntriesResumeAdapter, UpdatableConsumerResumeStrategy<Resumable> {
- private boolean called;
+ implements FileResumeAdapter, DirectoryEntriesResumeAdapter {
@Override
public void resume() {
@@ -88,10 +85,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
DirectoryEntries.doResume(fileSet, f -> true);
}
- @Override
- public void updateLastOffset(Resumable offset) {
- called = true;
- }
+
}
private static final TransientResumeStrategy FAIL_RESUME_STRATEGY = new TransientResumeStrategy(new FailResumeAdapter());
@@ -125,7 +119,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
List<Exchange> exchangeList = mock.getExchanges();
Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
- Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called);
}
@DisplayName("Tests whether it a missing offset does not cause a failure when using intermittent mode")
@@ -141,7 +134,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
List<Exchange> exchangeList = mock.getExchanges();
Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
- Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called);
}
@DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")