You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/14 11:02:36 UTC

[camel-examples] branch main updated: CAMEL-18126: improved split support for the file component

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


The following commit(s) were added to refs/heads/main by this push:
     new d1e5022b CAMEL-18126: improved split support for the file component
d1e5022b is described below

commit d1e5022bb1b43565903359aefaf91bcf23fc9c78
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Jun 13 14:39:51 2022 +0200

    CAMEL-18126: improved split support for the file component
---
 .../kafka/file/LargeFileRouteBuilder.java          | 77 ++++++++--------------
 .../example/resume/file/offset/main/MainApp.java   | 17 ++++-
 2 files changed, 42 insertions(+), 52 deletions(-)

diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 63b20fc1..fbb886b6 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -17,19 +17,16 @@
 
 package org.apache.camel.example.resume.strategies.kafka.file;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.Reader;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.adapters.FileOffset;
+import org.apache.camel.component.file.FileConstants;
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.support.resume.Resumables;
-import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,64 +40,41 @@ public class LargeFileRouteBuilder extends RouteBuilder {
     private final ResumeCache<File> cache;
 
     private long lastOffset;
-    private int batchSize;
+    private long lineCount = 0;
+
     private final CountDownLatch latch;
 
     public LargeFileRouteBuilder(KafkaResumeStrategy resumeStrategy, ResumeCache<File> cache, CountDownLatch latch) {
         this.testResumeStrategy = resumeStrategy;
         this.cache = cache;
-        String tmp = System.getProperty("resume.batch.size", "30");
-        this.batchSize = Integer.valueOf(tmp);
-
         this.latch = latch;
     }
 
-    private void process(Exchange exchange) throws Exception {
-        Reader reader = exchange.getIn().getBody(Reader.class);
-        BufferedReader br = IOHelper.buffered(reader);
-
-        File path = exchange.getMessage().getHeader("CamelFilePath", File.class);
-        LOG.debug("Path: {} ", path);
+    private void process(Exchange exchange) {
+        final String body = exchange.getMessage().getBody(String.class);
+        final String filePath = exchange.getMessage().getHeader(Exchange.FILE_PATH, String.class);
+        final File file = new File(filePath);
 
-        FileOffset offsetContainer = cache.get(path, FileOffset.class);
+        // Get the initial offset and use it to update the last offset when reading the first line
+        final Resumable resumable = exchange.getMessage().getHeader(FileConstants.INITIAL_OFFSET, Resumable.class);
+        final Long value = resumable.getLastOffset().getValue(Long.class);
 
-        if (offsetContainer != null) {
-            lastOffset = offsetContainer.getValue();
-        } else {
-            lastOffset = 0;
+        if (lineCount == 0) {
+            lastOffset += value;
         }
 
-        LOG.debug("Starting to read at offset {}", lastOffset);
-
-        String line = br.readLine();
-        int count = 0;
-        while (count < batchSize) {
-            if (line == null || line.isEmpty()) {
-                LOG.debug("End of file");
-                // EOF, therefore reset the offset
-                final Resumable resumable = Resumables.of(path, 0L);
-                exchange.getMessage().setHeader(Exchange.OFFSET, resumable);
-
-                break;
-            }
+        // It sums w/ 1 in order to account for the newline that is removed by readLine
+        lastOffset += body.length() + 1;
+        lineCount++;
 
-            LOG.debug("Read line at offset {} from the file: {}", lastOffset, line);
-            testResumeStrategy.updateLastOffset(Resumables.of(path, lastOffset));
+        exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(file, lastOffset));
 
-            // It sums w/ 1 in order to account for the newline that is removed by readLine
-            lastOffset += line.length() + 1;
-            // Simulate slow processing
-            Thread.sleep(50);
-            count++;
-
-            line = br.readLine();
-        }
-
-        if (count == batchSize) {
-            LOG.info("Reached the last offset in the batch. Stopping ...");
+        LOG.info("Read data: {} / offset key: {} / offset value: {}", body, filePath, lastOffset);
+        if (latch.getCount() == 1) {
             exchange.setRouteStop(true);
-            latch.countDown();
         }
+
+        latch.countDown();
     }
 
     /**
@@ -111,10 +85,15 @@ public class LargeFileRouteBuilder extends RouteBuilder {
         getCamelContext().getRegistry().bind("resumeCache", cache);
 
         from("file:{{input.dir}}?noop=true&fileName={{input.file}}")
-                .resumable("testResumeStrategy")
                 .routeId("largeFileRoute")
-                .convertBodyTo(Reader.class)
-                .process(this::process);
+                .convertBodyTo(String.class)
+                .split(body().tokenize("\n"))
+                    .streaming()
+                    .stopOnException()
+                .resumable()
+                    .resumeStrategy("testResumeStrategy")
+                    .intermittent(true)
+                    .process(this::process);
 
     }
 
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
index 9b01173d..aeb71bfb 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -40,15 +40,18 @@ public class MainApp {
     public static void main(String... args) throws Exception {
         Main main = new Main();
 
-        CountDownLatch latch = new CountDownLatch(1);
+        String tmp = System.getProperty("resume.batch.size", "30");
+        int batchSize = Integer.valueOf(tmp);
+
+        CountDownLatch latch = new CountDownLatch(batchSize);
         SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = getUpdatableConsumerResumeStrategy();
 
-        RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, new CaffeineCache<>(100), latch);
+        RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, new CaffeineCache<>(1), latch);
         main.configure().addRoutesBuilder(routeBuilder);
 
         Executors.newSingleThreadExecutor().submit(() -> waitForStop(main, latch));
 
-        main.run(args);
+        main.start();
     }
 
     private static SingleNodeKafkaResumeStrategy<Resumable> getUpdatableConsumerResumeStrategy() {
@@ -68,6 +71,14 @@ public class MainApp {
     private static void waitForStop(Main main, CountDownLatch latch) {
         try {
             latch.await();
+
+            main.stop();
+            int shutdownWait = 10;
+            while (!main.isStopped() && shutdownWait > 0) {
+                Thread.sleep(1000);
+                shutdownWait--;
+            }
+
             System.exit(0);
         } catch (InterruptedException e) {
             System.exit(1);