You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/10 17:26:37 UTC

[camel] branch main updated: CAMEL-18148: cleanup the offset update interfaces

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f7932e4ec60 CAMEL-18148: cleanup the offset update interfaces
f7932e4ec60 is described below

commit f7932e4ec604febdb393577cc5a6f9e5fd77c820
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Oct 10 13:45:28 2022 +0200

    CAMEL-18148: cleanup the offset update interfaces
    
    This should simplify further work to improve serialization and reliability
---
 .../resume/kafka/KafkaResumeStrategy.java          |  6 +---
 .../kafka/SingleNodeKafkaResumeStrategy.java       |  4 +--
 .../KafkaConsumerWithResumeRouteStrategyIT.java    |  4 +--
 .../org/apache/camel/resume/ResumeStrategy.java    |  8 +++++
 .../resume/UpdatableConsumerResumeStrategy.java    | 34 ----------------------
 .../docs/modules/eips/pages/resume-strategies.adoc |  2 +-
 .../processor/resume/ResumableCompletion.java      | 16 ++++------
 .../processor/resume/TransientResumeStrategy.java  |  6 ++++
 .../FileConsumerResumeFromOffsetStrategyTest.java  | 12 ++------
 9 files changed, 26 insertions(+), 66 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
index 9951e09303b..925ed2b2ba6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
@@ -17,15 +17,11 @@
 
 package org.apache.camel.processor.resume.kafka;
 
-import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 
 /**
  * Base interface for resume strategies that publish the offsets to a Kafka topic
- * 
- * @param <T> the type of resumable
  */
-public interface KafkaResumeStrategy<T extends Resumable> extends UpdatableConsumerResumeStrategy<T>, ResumeStrategy {
+public interface KafkaResumeStrategy extends ResumeStrategy {
 
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 9a60cce760f..8689eeed96e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -59,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
  * integrations.
  */
-public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> {
+public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
     private Consumer<byte[], byte[]> consumer;
@@ -124,7 +124,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     }
 
     @Override
-    public void updateLastOffset(T offset) throws Exception {
+    public <T extends Resumable> void updateLastOffset(T offset) throws Exception {
         OffsetKey<?> key = offset.getOffsetKey();
         Offset<?> offsetValue = offset.getLastOffset();
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 0f7ab577cbf..c6d903cd546 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -34,7 +34,6 @@ import org.apache.camel.resume.Offset;
 import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.apache.camel.support.resume.Resumables;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -62,8 +61,7 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
     private TestUpdateStrategy resumeStrategy;
     private CountDownLatch messagesLatch;
 
-    private static class TestUpdateStrategy extends TransientResumeStrategy
-            implements UpdatableConsumerResumeStrategy<Resumable> {
+    private static class TestUpdateStrategy extends TransientResumeStrategy {
         private final CountDownLatch messagesLatch;
         private boolean startCalled;
         private boolean offsetNull = true;
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 26039f712de..a9325b829b4 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -57,4 +57,12 @@ public interface ResumeStrategy extends Service {
     default void loadCache() throws Exception {
 
     }
+
+    /**
+     * Updates the last processed offset
+     *
+     * @param  offset    the offset to update
+     * @throws Exception if unable to update the offset
+     */
+    <T extends Resumable> void updateLastOffset(T offset) throws Exception;
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
deleted file mode 100644
index 52204789bc0..00000000000
--- a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.resume;
-
-/**
- * An updatable resume strategy
- *
- * @param <T> the type of the addressable value for the resumable object (for example, a file would use a Long value)
- */
-public interface UpdatableConsumerResumeStrategy<T extends Resumable> {
-
-    /**
-     * Updates the last processed offset
-     * 
-     * @param  offset    the offset to update
-     * @throws Exception if unable to update the offset
-     */
-    void updateLastOffset(T offset) throws Exception;
-}
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index e45a0f898d5..8328d99931c 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -93,7 +93,7 @@ available for each strategy. For instance, to configure either one of the Kafka
 
 === Implementing New Builtin Resume Strategies
 
-New builtin resume strategies can be created by implementing the `UpdatableConsumerResumeStrategy` and the `ResumeStrategy` interfaces. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details.
+New builtin resume strategies can be created by implementing the `ResumeStrategy` interface. Check the code for `SingleNodeKafkaResumeStrategy` for implementation details.
 
 == Local Cache Support
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index dfb75ee5b8f..35035215b59 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -21,7 +21,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeStrategy;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.apache.camel.spi.CamelLogger;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ExchangeHelper;
@@ -57,17 +56,12 @@ public class ResumableCompletion implements Synchronization {
                 LOG.trace("Processing the resumable of type: {}", resumable.getLastOffset().getValue());
             }
 
-            if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {
-                UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy
-                        = (UpdatableConsumerResumeStrategy) resumeStrategy;
-                try {
-                    updatableConsumerResumeStrategy.updateLastOffset(resumable);
-                } catch (Exception e) {
-                    LOG.error("Unable to update the offset: {}", e.getMessage(), e);
-                }
-            } else {
-                LOG.debug("Cannot perform an offset update because the strategy is not updatable");
+            try {
+                resumeStrategy.updateLastOffset(resumable);
+            } catch (Exception e) {
+                LOG.error("Unable to update the offset: {}", e.getMessage(), e);
             }
+
         } else {
             if (!intermittent) {
                 exchange.setException(new NoOffsetException(exchange));
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index 205612b17e9..0e926152536 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume;
 
+import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeStrategy;
 
@@ -46,6 +47,11 @@ public class TransientResumeStrategy implements ResumeStrategy {
         return ResumeStrategy.super.getAdapter(clazz);
     }
 
+    @Override
+    public <T extends Resumable> void updateLastOffset(T offset) {
+        // this is NO-OP
+    }
+
     @Override
     public void start() {
         // this is NO-OP
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index e4828ec3b73..3eee355cbe5 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -32,8 +32,6 @@ import org.apache.camel.component.file.consumer.FileResumeAdapter;
 import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
-import org.apache.camel.resume.Resumable;
-import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 import org.apache.camel.support.resume.Resumables;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
@@ -75,8 +73,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
     }
 
     private static class FailResumeAdapter
-            implements FileResumeAdapter, DirectoryEntriesResumeAdapter, UpdatableConsumerResumeStrategy<Resumable> {
-        private boolean called;
+            implements FileResumeAdapter, DirectoryEntriesResumeAdapter {
 
         @Override
         public void resume() {
@@ -88,10 +85,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
             DirectoryEntries.doResume(fileSet, f -> true);
         }
 
-        @Override
-        public void updateLastOffset(Resumable offset) {
-            called = true;
-        }
+
     }
 
     private static final TransientResumeStrategy FAIL_RESUME_STRATEGY = new TransientResumeStrategy(new FailResumeAdapter());
@@ -125,7 +119,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
 
         List<Exchange> exchangeList = mock.getExchanges();
         Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
-        Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called);
     }
 
     @DisplayName("Tests whether it a missing offset does not cause a failure when using intermittent mode")
@@ -141,7 +134,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
 
         List<Exchange> exchangeList = mock.getExchanges();
         Assertions.assertFalse(exchangeList.isEmpty(), "It should have received a few messages");
-        Assertions.assertFalse(((FailResumeAdapter) FAIL_RESUME_STRATEGY.getAdapter()).called);
     }
 
     @DisplayName("Tests whether we can start from the beginning (i.e.: no resume strategy)")