You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/14 11:02:36 UTC
[camel-examples] branch main updated: CAMEL-18126: improved split support for the file component
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git
The following commit(s) were added to refs/heads/main by this push:
new d1e5022b CAMEL-18126: improved split support for the file component
d1e5022b is described below
commit d1e5022bb1b43565903359aefaf91bcf23fc9c78
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Jun 13 14:39:51 2022 +0200
CAMEL-18126: improved split support for the file component
---
.../kafka/file/LargeFileRouteBuilder.java | 77 ++++++++--------------
.../example/resume/file/offset/main/MainApp.java | 17 ++++-
2 files changed, 42 insertions(+), 52 deletions(-)
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 63b20fc1..fbb886b6 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -17,19 +17,16 @@
package org.apache.camel.example.resume.strategies.kafka.file;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.Reader;
import java.util.concurrent.CountDownLatch;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.adapters.FileOffset;
+import org.apache.camel.component.file.FileConstants;
import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
import org.apache.camel.resume.Resumable;
import org.apache.camel.resume.cache.ResumeCache;
import org.apache.camel.support.resume.Resumables;
-import org.apache.camel.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,64 +40,41 @@ public class LargeFileRouteBuilder extends RouteBuilder {
private final ResumeCache<File> cache;
private long lastOffset;
- private int batchSize;
+ private long lineCount = 0;
+
private final CountDownLatch latch;
public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy, ResumeCache<File> cache, CountDownLatch latch) {
this.testResumeStrategy = resumeStrategy;
this.cache = cache;
- String tmp = System.getProperty("resume.batch.size", "30");
- this.batchSize = Integer.valueOf(tmp);
-
this.latch = latch;
}
- private void process(Exchange exchange) throws Exception {
- Reader reader = exchange.getIn().getBody(Reader.class);
- BufferedReader br = IOHelper.buffered(reader);
-
- File path = exchange.getMessage().getHeader("CamelFilePath", File.class);
- LOG.debug("Path: {} ", path);
+ private void process(Exchange exchange) {
+ final String body = exchange.getMessage().getBody(String.class);
+ final String filePath = exchange.getMessage().getHeader(Exchange.FILE_PATH, String.class);
+ final File file = new File(filePath);
- FileOffset offsetContainer = cache.get(path, FileOffset.class);
+ // Get the initial offset and use it to update the last offset when reading the first line
+ final Resumable resumable = exchange.getMessage().getHeader(FileConstants.INITIAL_OFFSET, Resumable.class);
+ final Long value = resumable.getLastOffset().getValue(Long.class);
- if (offsetContainer != null) {
- lastOffset = offsetContainer.getValue();
- } else {
- lastOffset = 0;
+ if (lineCount == 0) {
+ lastOffset += value;
}
- LOG.debug("Starting to read at offset {}", lastOffset);
-
- String line = br.readLine();
- int count = 0;
- while (count < batchSize) {
- if (line == null || line.isEmpty()) {
- LOG.debug("End of file");
- // EOF, therefore reset the offset
- final Resumable resumable = Resumables.of(path, 0L);
- exchange.getMessage().setHeader(Exchange.OFFSET, resumable);
-
- break;
- }
+ // It sums w/ 1 in order to account for the newline that is removed by readLine
+ lastOffset += body.length() + 1;
+ lineCount++;
- LOG.debug("Read line at offset {} from the file: {}", lastOffset, line);
- testResumeStrategy.updateLastOffset(Resumables.of(path, lastOffset));
+ exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(file, lastOffset));
- // It sums w/ 1 in order to account for the newline that is removed by readLine
- lastOffset += line.length() + 1;
- // Simulate slow processing
- Thread.sleep(50);
- count++;
-
- line = br.readLine();
- }
-
- if (count == batchSize) {
- LOG.info("Reached the last offset in the batch. Stopping ...");
+ LOG.info("Read data: {} / offset key: {} / offset value: {}", body, filePath, lastOffset);
+ if (latch.getCount() == 1) {
exchange.setRouteStop(true);
- latch.countDown();
}
+
+ latch.countDown();
}
/**
@@ -111,10 +85,15 @@ public class LargeFileRouteBuilder extends RouteBuilder {
getCamelContext().getRegistry().bind("resumeCache", cache);
from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
- .resumable("testResumeStrategy")
.routeId("largeFileRoute")
- .convertBodyTo(Reader.class)
- .process(this::process);
+ .convertBodyTo(String.class)
+ .split(body().tokenize("\n"))
+ .streaming()
+ .stopOnException()
+ .resumable()
+ .resumeStrategy("testResumeStrategy")
+ .intermittent(true)
+ .process(this::process);
}
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
index 9b01173d..aeb71bfb 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -40,15 +40,18 @@ public class MainApp {
public static void main(String... args) throws Exception {
Main main = new Main();
- CountDownLatch latch = new CountDownLatch(1);
+ String tmp = System.getProperty("resume.batch.size", "30");
+ int batchSize = Integer.valueOf(tmp);
+
+ CountDownLatch latch = new CountDownLatch(batchSize);
SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = getUpdatableConsumerResumeStrategy();
- RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, new CaffeineCache<>(100), latch);
+ RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, new CaffeineCache<>(1), latch);
main.configure().addRoutesBuilder(routeBuilder);
Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
- main.run(args);
+ main.start();
}
private static SingleNodeKafkaResumeStrategy<Resumable> getUpdatableConsumerResumeStrategy() {
@@ -68,6 +71,14 @@ public class MainApp {
private static void waitForStop(Main main, CountDownLatch latch) {
try {
latch.await();
+
+ main.stop();
+ int shutdownWait = 10;
+ while (!main.isStopped() && shutdownWait > 0) {
+ Thread.sleep(1000);
+ shutdownWait--;
+ }
+
System.exit(0);
} catch (InterruptedException e) {
System.exit(1);