You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/01 18:24:20 UTC
[camel] 14/16: CAMEL-15562: simplify defining and setting the resume strategy
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit bbd4a84d5c75fbf8bf27f23f3bf537a8b4cd3e93
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Mar 1 12:54:22 2022 +0100
CAMEL-15562: simplify defining and setting the resume strategy
---
.../integration/KafkaConsumerWithResumeRouteStrategyIT.java | 2 +-
.../java/org/apache/camel/model/ProcessorDefinition.java | 10 ++++++++++
.../java/org/apache/camel/model/ResumableDefinition.java | 12 +++++++++---
.../main/java/org/apache/camel/reifier/ResumableReifier.java | 2 +-
.../file/FileConsumerResumeFromOffsetStrategyTest.java | 4 ++--
.../camel/component/file/FileConsumerResumeStrategyTest.java | 2 +-
6 files changed, 24 insertions(+), 8 deletions(-)
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index c6a27ca..dbc9d91 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -225,7 +225,7 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTes
.routeId("resume-strategy-route")
.setHeader(Exchange.OFFSET,
constant(Resumables.of("key", RANDOM_VALUE)))
- .resumable().resumableStrategyRef("resumeStrategy")
+ .resumable().resumeStrategy("resumeStrategy")
.to("mock:result");
}
};
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 7769c3e..f6f2b19 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -44,6 +44,7 @@ import org.apache.camel.Expression;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
+import org.apache.camel.ResumeStrategy;
import org.apache.camel.builder.DataFormatClause;
import org.apache.camel.builder.EndpointConsumerBuilder;
import org.apache.camel.builder.EndpointProducerBuilder;
@@ -3787,6 +3788,15 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
return answer;
}
+ public ResumableDefinition resumable(ResumeStrategy resumeStrategy) {
+ ResumableDefinition answer = new ResumableDefinition();
+
+ answer.setResumeStrategy(resumeStrategy);
+
+ addOutput(answer);
+ return answer;
+ }
+
// Properties
// -------------------------------------------------------------------------
diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
index 3a92d32..a2f4611 100644
--- a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
+++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -22,8 +22,8 @@ import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.ResumeStrategy;
-public class ResumableDefinition extends OutputExpressionNode {
- @XmlAttribute(required = true)
+public class ResumableDefinition extends NoOutputDefinition<ResumableDefinition> {
+ @XmlAttribute(required = true, name = "resumeStrategy")
private String resumeStrategyRef;
@XmlTransient
@@ -52,9 +52,15 @@ public class ResumableDefinition extends OutputExpressionNode {
// Fluent API
// -------------------------------------------------------------------------
- public ResumableDefinition resumableStrategyRef(String resumeStrategyRef) {
+ public ResumableDefinition resumeStrategy(String resumeStrategyRef) {
setResumeStrategyRef(resumeStrategyRef);
return this;
}
+
+ public ResumableDefinition resumeStrategy(ResumeStrategy resumeStrategy) {
+ setResumeStrategy(resumeStrategy);
+
+ return this;
+ }
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index ecf626a..9fcb1a5 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -25,7 +25,7 @@ import org.apache.camel.model.ResumableDefinition;
import org.apache.camel.processor.resume.ResumableProcessor;
import org.apache.camel.util.ObjectHelper;
-public class ResumableReifier extends ExpressionReifier<ResumableDefinition> {
+public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
protected ResumableReifier(Route route, ProcessorDefinition<?> definition) {
super(route, ResumableDefinition.class.cast(definition));
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index 2db4785..a084eb8 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -110,12 +110,12 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
from(fileUri("resumeOff?noop=true&recursive=true"))
.setHeader(Exchange.OFFSET,
constant(Resumables.of("resume-none.txt", 3)))
- .resumable().resumableStrategyRef("resumeStrategy")
+ .resumable().resumeStrategy("resumeStrategy")
.log("${body}")
.convertBodyTo(String.class).to("mock:result");
from(fileUri("resumeMissingOffset?noop=true&recursive=true"))
- .resumable().resumableStrategyRef("resumeStrategy")
+ .resumable().resumeStrategy("resumeStrategy")
.log("${body}")
.convertBodyTo(String.class).to("mock:result");
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index bf662ee..470a459 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -103,7 +103,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
from(fileUri("resume?noop=true&recursive=true"))
.process(e -> setOffset(e))
- .resumable().resumableStrategyRef("testResumeStrategy")
+ .resumable().resumeStrategy("testResumeStrategy")
.convertBodyTo(String.class)
.to("mock:result");
}