You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/19 04:20:26 UTC

[camel-examples] branch main updated (77537fe3 -> e97db54a)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git


    from 77537fe3 CAMEL-6645: A camel-mapstruct example
     new 907be061 (chores) camel-resume-api-examples: avoid blocking for too long by default
     new 32d671c9 (chores) camel-resume-api-examples: add a new route for checking the results
     new 2491ed2b (chores) camel-resume-api-examples: minor logging adjustments for debugging
     new ddfa2658 (chores) camel-resume-api-examples: remove the unnecessary throttling
     new e17fb5fd (chores) camel-resume-api-examples: add support for verifying the results
     new e37b65f0 (chores) camel-resume-api-examples: logging cleanups for resume-api-file-offset
     new f2664dce (chores) camel-resume-api-examples: use the default strategy builder as part of CAMEL-18362
     new 9674ff8e (chores) camel-resume-api-examples: support checking results for the offset example
     new f2469b66 (chores) camel-resume-api-examples: adjust the initialization duration to match the launch script
     new 3236a87b (chores) camel-resume-api-examples: avoid blocking the check route for too long
     new 7fc2c181 (chores) camel-resume-api-examples: minor stability cleanups
     new e97db54a (chores) camel-resume-api: cleaned up the example with clustering

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 examples/resume-api/resume-api-common/pom.xml      |  4 +++
 .../example/resume/strategies/kafka/KafkaUtil.java | 19 +++--------
 .../resume/strategies/kafka/check/CheckRoute.java  | 14 ++++++++
 .../kafka/file/LargeFileRouteBuilder.java          |  8 +++++
 .../kafka/fileset/LargeDirectoryRouteBuilder.java  |  5 +--
 .../src/main/docker/Dockerfile                     |  2 +-
 .../example/resume/file/offset/main/MainApp.java   |  2 +-
 .../src/main/resources/log4j2.properties           | 15 ++++++++-
 .../resume-api-file-offset/src/main/scripts/run.sh | 37 +++++++++++++++++---
 .../clusterized/main/ClusterizedListener.java      | 22 +++---------
 .../ClusterizedLargeDirectoryRouteBuilder.java     |  2 +-
 .../resume-api-fileset/src/main/docker/Dockerfile  |  1 +
 .../camel/example/resume/fileset/main/MainApp.java |  3 ++
 .../src/main/resources/log4j2.properties           | 14 +++++++-
 .../resume-api-fileset/src/main/scripts/run.sh     | 39 ++++++++++++++++++----
 15 files changed, 134 insertions(+), 53 deletions(-)
 create mode 100644 examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java


[camel-examples] 03/12: (chores) camel-resume-api-examples: minor logging adjustments for debugging

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit 2491ed2b3ca9ed2fd4bd33b633f6b5bccd3f4d04
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 10:40:14 2022 +0200

    (chores) camel-resume-api-examples: minor logging adjustments for debugging
---
 .../src/main/resources/log4j2.properties                   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git a/examples/resume-api/resume-api-fileset/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-fileset/src/main/resources/log4j2.properties
index fb11b710..6c4fcac7 100644
--- a/examples/resume-api/resume-api-fileset/src/main/resources/log4j2.properties
+++ b/examples/resume-api/resume-api-fileset/src/main/resources/log4j2.properties
@@ -29,7 +29,7 @@ logger.camel.additivity = false
 logger.camel.appenderRef.file.ref = rolling-out
 
 logger.camel-resume.name = org.apache.camel.processor.resume
-logger.camel-resume.level = DEBUG
+logger.camel-resume.level = INFO
 logger.camel-resume.additivity = false
 logger.camel-resume.appenderRef.file.ref = rolling-out
 logger.camel-resume.appenderRef.console.ref = console
@@ -46,6 +46,18 @@ logger.camel-file-resume.additivity = false
 logger.camel-file-resume.appenderRef.file.ref = rolling-out
 logger.camel-file-resume.appenderRef.console.ref = console
 
+logger.camel-kafka.name = org.apache.camel.component.kafka
+logger.camel-kafka.level = WARN
+logger.camel-kafka.additivity = false
+logger.camel-kafka.appenderRef.file.ref = rolling-out
+logger.camel-kafka.appenderRef.console.ref = console
+
+logger.camel-kafka-resume.name = org.apache.camel.processor.resume.kafka
+logger.camel-kafka-resume.level = WARN
+logger.camel-kafka-resume.additivity = false
+logger.camel-kafka-resume.appenderRef.file.ref = rolling-out
+logger.camel-kafka-resume.appenderRef.console.ref = console
+
 logger.kafka.name = org.apache.kafka
 logger.kafka.level = INFO
 logger.kafka.additivity = false


[camel-examples] 11/12: (chores) camel-resume-api-examples: minor stability cleanups

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit 7fc2c181f66c1d91d27a9eb00b1c1bed3d964862
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Aug 10 13:18:48 2022 +0200

    (chores) camel-resume-api-examples: minor stability cleanups
---
 .../java/org/apache/camel/example/resume/fileset/main/MainApp.java    | 3 ++-
 examples/resume-api/resume-api-fileset/src/main/scripts/run.sh        | 4 ++--
 2 files changed, 4 insertions(+), 3 deletions(-)

diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
index 8bf8bfea..1b8268e0 100644
--- a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
@@ -40,8 +40,9 @@ public class MainApp {
         SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = KafkaUtil.getDefaultStrategy();
         RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(resumeStrategy, new CaffeineCache<>(10000));
 
-        main.configure().addRoutesBuilder(routeBuilder);
         main.configure().addRoutesBuilder(new CheckRoute());
+        main.configure().addRoutesBuilder(routeBuilder);
+
         main.run(args);
     }
 }
diff --git a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
index 972ad6bc..4a80ba9f 100644
--- a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
+++ b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
@@ -23,7 +23,7 @@ function checkResults() {
 
   echo "###**************************************************************************###"
   echo "Results: repeated items: ${repeated}"
-  echo "Results: processed items: ${processedRecords} (expected ${expectedItems})"
+  echo "Results: processed items: ${processedRecords} (expected at least ${expectedItems})"
   echo "###**************************************************************************###"
   echo "Resume simulation completed"
   echo "###**************************************************************************###"
@@ -39,7 +39,7 @@ mkdir -p ${DATA_DIR}
 ITERATIONS=${1:-5}
 BATCH_SIZE=${2:-100}
 FILE_COUNT=${3:-100}
-MAX_IDLE=5
+MAX_IDLE=10
 
 for i in $(seq 0 ${ITERATIONS}) ; do
   mkdir -p ${DATA_DIR}/${i}


[camel-examples] 04/12: (chores) camel-resume-api-examples: remove the unnecessary throttling

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit ddfa265874af2323778b4ad4259b056b7445fcd4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 10:48:19 2022 +0200

    (chores) camel-resume-api-examples: remove the unnecessary throttling
---
 .../resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java  | 5 +----
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
index e5f1adec..9a34fdfd 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/fileset/LargeDirectoryRouteBuilder.java
@@ -39,13 +39,10 @@ public class LargeDirectoryRouteBuilder extends RouteBuilder {
         this.cache = cache;
     }
 
-    private void process(Exchange exchange) throws Exception {
+    private void process(Exchange exchange) {
         File path = exchange.getMessage().getHeader("CamelFilePath", File.class);
         LOG.debug("Processing {}", path.getPath());
         exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(path.getParentFile(), path));
-
-        // Put a delay to simulate slow processing
-        Thread.sleep(50);
     }
 
     /**


[camel-examples] 08/12: (chores) camel-resume-api-examples: support checking results for the offset example

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit 9674ff8e4686bf862cd483f08a51ae2e44899012
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 17:42:26 2022 +0200

    (chores) camel-resume-api-examples: support checking results for the offset example
---
 examples/resume-api/resume-api-common/pom.xml      |  4 +++
 .../kafka/file/LargeFileRouteBuilder.java          |  8 +++++
 .../src/main/docker/Dockerfile                     |  2 +-
 .../resume-api-file-offset/src/main/scripts/run.sh | 37 +++++++++++++++++++---
 4 files changed, 46 insertions(+), 5 deletions(-)

diff --git a/examples/resume-api/resume-api-common/pom.xml b/examples/resume-api/resume-api-common/pom.xml
index deabd257..c7f6f5cf 100644
--- a/examples/resume-api/resume-api-common/pom.xml
+++ b/examples/resume-api/resume-api-common/pom.xml
@@ -35,6 +35,10 @@
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-kafka</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-direct</artifactId>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
index 0c1e6237..33451452 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.util.concurrent.CountDownLatch;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.FileConstants;
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategy;
@@ -37,6 +38,7 @@ import org.slf4j.LoggerFactory;
 public class LargeFileRouteBuilder extends RouteBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(LargeFileRouteBuilder.class);
 
+    private ProducerTemplate producerTemplate;
     private KafkaResumeStrategy testResumeStrategy;
     private final ResumeCache<File> cache;
 
@@ -70,6 +72,7 @@ public class LargeFileRouteBuilder extends RouteBuilder {
 
         exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(file, lastOffset));
 
+        producerTemplate.sendBody("direct:summary", String.valueOf(lastOffset));
         LOG.info("Read data: {} / offset key: {} / offset value: {}", body, filePath, lastOffset);
         if (latch.getCount() == 1) {
             exchange.setRouteStop(true);
@@ -82,6 +85,8 @@ public class LargeFileRouteBuilder extends RouteBuilder {
      * Let's configure the Camel routing rules using Java code...
      */
     public void configure() {
+        producerTemplate = getContext().createProducerTemplate();
+
         getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, testResumeStrategy);
         getCamelContext().getRegistry().bind(ResumeCache.DEFAULT_NAME, cache);
 
@@ -96,6 +101,9 @@ public class LargeFileRouteBuilder extends RouteBuilder {
                     .intermittent(true)
                     .process(this::process);
 
+        from("direct:summary")
+                .to("file:{{output.dir}}?fileName=summary.txt&fileExist=Append&appendChars=\n");
+
     }
 
 }
diff --git a/examples/resume-api/resume-api-file-offset/src/main/docker/Dockerfile b/examples/resume-api/resume-api-file-offset/src/main/docker/Dockerfile
index 50067719..2a32e74a 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/docker/Dockerfile
+++ b/examples/resume-api/resume-api-file-offset/src/main/docker/Dockerfile
@@ -15,7 +15,7 @@
 FROM fedora:35 as resume-api-file-offset
 LABEL maintainer="orpiske@apache.org"
 
-
+ENV OUTPUT_DIR /data/output
 ENV DATA_DIR /data/source
 ENV DATA_FILE data.txt
 ENV DEPLOYMENT_DIR /deployments
diff --git a/examples/resume-api/resume-api-file-offset/src/main/scripts/run.sh b/examples/resume-api/resume-api-file-offset/src/main/scripts/run.sh
index 2158150f..be547653 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/scripts/run.sh
+++ b/examples/resume-api/resume-api-file-offset/src/main/scripts/run.sh
@@ -1,3 +1,4 @@
+#!/bin/bash
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -14,6 +15,37 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+function checkResults() {
+#  expectedItems=$((ITERATIONS * BATCH_SIZE))
+#  processedRecords=$(cat ${OUTPUT_DIR}/summary.txt | wc -l)
+#  repeated=$(cat ${OUTPUT_DIR}/summary.txt | sort | uniq --count --repeated | wc -l)
+
+  expectedOffset=0
+  dataSize=11
+  errors=0
+  echo "###**************************************************************************###"
+  for line in $(cat ${OUTPUT_DIR}/summary.txt) ; do
+    expectedOffset=$((dataSize + expectedOffset))
+
+    offsetValue=$(echo $line | sed 's/^[ ]*//')
+
+    if [[ "${expectedOffset}" != "${offsetValue}" ]] ; then
+      errors=$(( errors++ ))
+      echo "Results: offset value with error = ${offsetValue} | expectedOffset = ${expectedOffset}."
+      echo "Error count: ${errors}"
+    fi
+  done
+
+  echo "Results: number of items with errors: ${errors}"
+  echo "###**************************************************************************###"
+  echo "Resume simulation completed"
+  echo "###**************************************************************************###"
+
+}
+
+trap checkResults exit SIGINT SIGABRT SIGHUP
+
+
 ITERATIONS=${1:-5}
 BATCH_SIZE=${2:-50}
 
@@ -29,7 +61,7 @@ for i in $(seq 0 ${ITERATIONS}) ; do
   echo "Running the iteration ${i} of ${ITERATIONS} with a batch of ${BATCH_SIZE} offsets"
   echo "********************************************************************************"
   java -Dinput.dir=${DATA_DIR} \
-    -Doutput.dir=/tmp/out \
+    -Doutput.dir=${OUTPUT_DIR} \
     -Dinput.file=${DATA_FILE} \
     -Dresume.type=kafka \
     -Dresume.type.kafka.topic=file-offsets \
@@ -42,7 +74,4 @@ for i in $(seq 0 ${ITERATIONS}) ; do
     sleep 2s
 done
 
-echo "###**************************************************************************###"
-echo "Resume simulation completed"
-echo "###**************************************************************************###"
 exit 0


[camel-examples] 01/12: (chores) camel-resume-api-examples: avoid blocking for too long by default

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit 907be061cba7c3db54f346d99b344db32b492db3
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 10:09:30 2022 +0200

    (chores) camel-resume-api-examples: avoid blocking for too long by default
---
 .../java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
index f943c2cc..68c97cc4 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
@@ -36,6 +36,7 @@ public final class KafkaUtil {
                 KafkaResumeStrategyConfigurationBuilder.newBuilder()
                         .withBootstrapServers(bootStrapAddress)
                         .withTopic(kafkaTopic)
+                        .withProducerProperty("max.block.ms", "10000")
                         .build();
 
         return new SingleNodeKafkaResumeStrategy<>(resumeStrategyConfiguration);


[camel-examples] 10/12: (chores) camel-resume-api-examples: avoid blocking the check route for too long

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit 3236a87bd030d1eac374c5cb0a53fce40e41f481
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Aug 10 13:18:02 2022 +0200

    (chores) camel-resume-api-examples: avoid blocking the check route for too long
---
 .../apache/camel/example/resume/strategies/kafka/check/CheckRoute.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java
index fbc3ab02..67db65fd 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java
@@ -6,7 +6,7 @@ public class CheckRoute extends RouteBuilder {
 
     @Override
     public void configure() {
-        from("kafka:{{resume.type.kafka.topic}}?brokers={{bootstrap.address}}")
+        from("kafka:{{resume.type.kafka.topic}}?brokers={{bootstrap.address}}&maxBlockMs=5000&pollTimeoutMs=1000")
                 .to("file:{{output.dir}}?fileName=summary.txt&fileExist=Append&appendChars=\n");
     }
 


[camel-examples] 07/12: (chores) camel-resume-api-examples: use the default strategy builder as part of CAMEL-18362

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit f2664dce9233f9cdf8adc7861cf07f37cd1f7b18
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 17:41:21 2022 +0200

    (chores) camel-resume-api-examples: use the default strategy builder as part of CAMEL-18362
---
 .../camel/example/resume/strategies/kafka/KafkaUtil.java  | 15 ---------------
 .../camel/example/resume/file/offset/main/MainApp.java    |  2 +-
 2 files changed, 1 insertion(+), 16 deletions(-)

diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
index 68c97cc4..cd4efef8 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
@@ -20,7 +20,6 @@ package org.apache.camel.example.resume.strategies.kafka;
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfiguration;
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
 import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
-import org.apache.camel.resume.Cacheable;
 import org.apache.camel.resume.Resumable;
 
 public final class KafkaUtil {
@@ -41,18 +40,4 @@ public final class KafkaUtil {
 
         return new SingleNodeKafkaResumeStrategy<>(resumeStrategyConfiguration);
     }
-
-    public static SingleNodeKafkaResumeStrategy<Resumable> getMinimizingStrategy() {
-        String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
-        String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
-
-        KafkaResumeStrategyConfiguration resumeStrategyConfiguration =
-                KafkaResumeStrategyConfigurationBuilder.newBuilder()
-                        .withCacheFillPolicy(Cacheable.FillPolicy.MINIMIZING)
-                        .withBootstrapServers(bootStrapAddress)
-                        .withTopic(kafkaTopic)
-                        .build();
-
-        return new SingleNodeKafkaResumeStrategy<>(resumeStrategyConfiguration);
-    }
 }
diff --git a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
index 5bc8271d..cfe99abe 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
+++ b/examples/resume-api/resume-api-file-offset/src/main/java/org/apache/camel/example/resume/file/offset/main/MainApp.java
@@ -43,7 +43,7 @@ public class MainApp {
         int batchSize = Integer.valueOf(tmp);
 
         CountDownLatch latch = new CountDownLatch(batchSize);
-        SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = KafkaUtil.getMinimizingStrategy();
+        SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = KafkaUtil.getDefaultStrategy();
 
         RouteBuilder routeBuilder = new LargeFileRouteBuilder(resumeStrategy, new CaffeineCache<>(1), latch);
         main.configure().addRoutesBuilder(routeBuilder);


[camel-examples] 06/12: (chores) camel-resume-api-examples: logging cleanups for resume-api-file-offset

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit e37b65f0ccca7c0b321ef0cf40c7cbb6ca540e1b
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 17:40:15 2022 +0200

    (chores) camel-resume-api-examples: logging cleanups for resume-api-file-offset
---
 .../src/main/resources/log4j2.properties                  | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/examples/resume-api/resume-api-file-offset/src/main/resources/log4j2.properties b/examples/resume-api/resume-api-file-offset/src/main/resources/log4j2.properties
index fb11b710..f81d5941 100644
--- a/examples/resume-api/resume-api-file-offset/src/main/resources/log4j2.properties
+++ b/examples/resume-api/resume-api-file-offset/src/main/resources/log4j2.properties
@@ -27,9 +27,10 @@ logger.camel.name = org.apache.camel
 logger.camel.level = WARN
 logger.camel.additivity = false
 logger.camel.appenderRef.file.ref = rolling-out
+#logger.camel.appenderRef.console.ref = console
 
 logger.camel-resume.name = org.apache.camel.processor.resume
-logger.camel-resume.level = DEBUG
+logger.camel-resume.level = INFO
 logger.camel-resume.additivity = false
 logger.camel-resume.appenderRef.file.ref = rolling-out
 logger.camel-resume.appenderRef.console.ref = console
@@ -46,6 +47,18 @@ logger.camel-file-resume.additivity = false
 logger.camel-file-resume.appenderRef.file.ref = rolling-out
 logger.camel-file-resume.appenderRef.console.ref = console
 
+logger.camel-kafka.name = org.apache.camel.component.kafka
+logger.camel-kafka.level = WARN
+logger.camel-kafka.additivityz = false
+logger.camel-kafka.appenderRef.file.ref = rolling-out
+logger.camel-kafka.appenderRef.console.ref = console
+
+logger.camel-kafka-resume.name = org.apache.camel.processor.resume.kafka
+logger.camel-kafka-resume.level = INFO
+logger.camel-kafka-resume.additivity = false
+logger.camel-kafka-resume.appenderRef.file.ref = rolling-out
+logger.camel-kafka-resume.appenderRef.console.ref = console
+
 logger.kafka.name = org.apache.kafka
 logger.kafka.level = INFO
 logger.kafka.additivity = false


[camel-examples] 05/12: (chores) camel-resume-api-examples: add support for verifying the results

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit e17fb5fde62c0322120eafb014af087ede44e7c6
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 10:48:43 2022 +0200

    (chores) camel-resume-api-examples: add support for verifying the results
---
 .../resume-api-fileset/src/main/docker/Dockerfile  |  1 +
 .../resume-api-fileset/src/main/scripts/run.sh     | 39 ++++++++++++++++++----
 2 files changed, 33 insertions(+), 7 deletions(-)

diff --git a/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
index f054938a..bd4353b9 100644
--- a/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
+++ b/examples/resume-api/resume-api-fileset/src/main/docker/Dockerfile
@@ -21,6 +21,7 @@ COPY src/main/scripts/run.sh /deployments/run.sh
 
 ENV JAVA_HOME /etc/alternatives/jre
 ENV DATA_DIR /data/source
+ENV OUTPUT_DIR /data/output
 
 RUN chmod +x /deployments/*.sh
 WORKDIR /deployments/
diff --git a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
index fff449c1..972ad6bc 100644
--- a/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
+++ b/examples/resume-api/resume-api-fileset/src/main/scripts/run.sh
@@ -1,3 +1,4 @@
+#!/bin/sh
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -14,13 +15,31 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
+function checkResults() {
+  expectedItems=$((ITERATIONS * BATCH_SIZE))
+  processedRecords=$(cat ${OUTPUT_DIR}/summary.txt | wc -l)
+  repeated=$(cat ${OUTPUT_DIR}/summary.txt | sort | uniq --count --repeated | wc -l)
+
+  echo "###**************************************************************************###"
+  echo "Results: repeated items: ${repeated}"
+  echo "Results: processed items: ${processedRecords} (expected ${expectedItems})"
+  echo "###**************************************************************************###"
+  echo "Resume simulation completed"
+  echo "###**************************************************************************###"
+
+}
+
+trap checkResults exit SIGINT SIGABRT SIGHUP
+
 echo "The test will process the following directory tree:"
 
 mkdir -p ${DATA_DIR}
 
 ITERATIONS=${1:-5}
-BATCH_SIZE=${2:-50}
+BATCH_SIZE=${2:-100}
 FILE_COUNT=${3:-100}
+MAX_IDLE=5
 
 for i in $(seq 0 ${ITERATIONS}) ; do
   mkdir -p ${DATA_DIR}/${i}
@@ -35,22 +54,28 @@ for i in $(seq 0 ${ITERATIONS}) ; do
   done
 
   echo "Only the following files should processed in this execution:"
-  tree ${DATA_DIR}/${i} | pv -q -L 1014
+  find ${DATA_DIR}/${i} -type f | pv -q -L 1014
 
   java -Dinput.dir=${DATA_DIR} \
-    -Doutput.dir=/tmp/out \
+    -Doutput.dir=${OUTPUT_DIR} \
     -Dresume.type=kafka \
     -Dresume.type.kafka.topic=dir-offsets \
     -Dbootstrap.address=kafka:9092 \
     -jar /deployments/example.jar \
-    -dm ${BATCH_SIZE}
+    -di ${MAX_IDLE}
     echo "********************************************************************************"
     echo "Finished the iteration ${i}"
     echo "********************************************************************************"
     sleep 2s
+
+    if [[ -f ${OUTPUT_DIR}/summary.txt ]] ; then
+      processedRecords=$(cat ${OUTPUT_DIR}/summary.txt | wc -l)
+      echo "Processed ${processedRecords} so far"
+    fi
+
+
 done
 
-echo "###**************************************************************************###"
-echo "Resume simulation completed"
-echo "###**************************************************************************###"
+
+
 exit 0


[camel-examples] 02/12: (chores) camel-resume-api-examples: add a new route for checking the results

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit 32d671c906161d2c86ce0600682ee5c49ec55ccf
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Aug 9 10:10:11 2022 +0200

    (chores) camel-resume-api-examples: add a new route for checking the results
---
 .../example/resume/strategies/kafka/check/CheckRoute.java  | 14 ++++++++++++++
 .../apache/camel/example/resume/fileset/main/MainApp.java  |  2 ++
 2 files changed, 16 insertions(+)

diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java
new file mode 100644
index 00000000..fbc3ab02
--- /dev/null
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/check/CheckRoute.java
@@ -0,0 +1,14 @@
+package org.apache.camel.example.resume.strategies.kafka.check;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class CheckRoute extends RouteBuilder {
+
+    @Override
+    public void configure() {
+        from("kafka:{{resume.type.kafka.topic}}?brokers={{bootstrap.address}}")
+                .to("file:{{output.dir}}?fileName=summary.txt&fileExist=Append&appendChars=\n");
+    }
+
+
+}
diff --git a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
index ae30afae..8bf8bfea 100644
--- a/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
+++ b/examples/resume-api/resume-api-fileset/src/main/java/org/apache/camel/example/resume/fileset/main/MainApp.java
@@ -20,6 +20,7 @@ package org.apache.camel.example.resume.fileset.main;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.caffeine.resume.CaffeineCache;
 import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
+import org.apache.camel.example.resume.strategies.kafka.check.CheckRoute;
 import org.apache.camel.example.resume.strategies.kafka.fileset.LargeDirectoryRouteBuilder;
 import org.apache.camel.main.Main;
 import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
@@ -40,6 +41,7 @@ public class MainApp {
         RouteBuilder routeBuilder = new LargeDirectoryRouteBuilder(resumeStrategy, new CaffeineCache<>(10000));
 
         main.configure().addRoutesBuilder(routeBuilder);
+        main.configure().addRoutesBuilder(new CheckRoute());
         main.run(args);
     }
 }


[camel-examples] 12/12: (chores) camel-resume-api: cleaned up the example with clustering

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit e97db54a6946ed28e0bca330a07d389e5d5ef798
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Aug 10 16:27:31 2022 +0200

    (chores) camel-resume-api: cleaned up the example with clustering
---
 .../clusterized/main/ClusterizedListener.java      | 22 ++++------------------
 .../ClusterizedLargeDirectoryRouteBuilder.java     |  2 +-
 2 files changed, 5 insertions(+), 19 deletions(-)

diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
index 5d113d90..42471c14 100644
--- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
+++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/main/ClusterizedListener.java
@@ -20,10 +20,9 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.zookeeper.cluster.ZooKeeperClusterService;
 import org.apache.camel.example.resume.fileset.clusterized.strategies.ClusterizedLargeDirectoryRouteBuilder;
+import org.apache.camel.example.resume.strategies.kafka.KafkaUtil;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.MainListener;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfiguration;
-import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
 import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeStrategy;
@@ -55,7 +54,7 @@ class ClusterizedListener implements MainListener {
             main.getCamelContext().addService(clusterService);
 
             LOG.trace("Creating the strategy");
-            SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = newResumeStrategy();
+            SingleNodeKafkaResumeStrategy<Resumable> resumeStrategy = KafkaUtil.getDefaultStrategy();
             main.getCamelContext().getRegistry().bind(ResumeStrategy.DEFAULT_NAME, resumeStrategy);
 
             LOG.trace("Creating the route");
@@ -89,20 +88,7 @@ class ClusterizedListener implements MainListener {
 
     @Override
     public void afterStop(BaseMainSupport main) {
-        main.shutdown();
-        System.exit(0);
-    }
-
-    private static SingleNodeKafkaResumeStrategy<Resumable> newResumeStrategy() {
-        String bootStrapAddress = System.getProperty("bootstrap.address", "localhost:9092");
-        String kafkaTopic = System.getProperty("resume.type.kafka.topic", "offsets");
-
-        KafkaResumeStrategyConfiguration resumeStrategyConfiguration =
-                KafkaResumeStrategyConfigurationBuilder.newBuilder()
-                        .withBootstrapServers(bootStrapAddress)
-                        .withTopic(kafkaTopic)
-                        .build();
-
-        return new SingleNodeKafkaResumeStrategy<>(resumeStrategyConfiguration);
+//        main.shutdown();
+//        System.exit(0);
     }
 }
diff --git a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
index 842e0d85..d7340907 100644
--- a/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
+++ b/examples/resume-api/resume-api-fileset-clusterized/src/main/java/org/apache/camel/example/resume/fileset/clusterized/strategies/ClusterizedLargeDirectoryRouteBuilder.java
@@ -54,7 +54,7 @@ public class ClusterizedLargeDirectoryRouteBuilder extends RouteBuilder {
                 .routeId("heartbeat")
                 .log("HeartBeat route (timer) ...");
 
-        from("master:resume-ns:file:{{input.dir}}?noop=true&recursive=true")
+        from("master:resume-ns:file:{{input.dir}}?noop=true&recursive=true&repeatCount=1")
                 .resumable(ResumeStrategy.DEFAULT_NAME)
                 .routeId("clustered")
                 .process(this::process)


[camel-examples] 09/12: (chores) camel-resume-api-examples: adjust the initialization duration to match the launch script

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-examples.git

commit f2469b66a8f7e4bfc601b3503e7e638629445d8f
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Aug 10 13:17:26 2022 +0200

    (chores) camel-resume-api-examples: adjust the initialization duration to match the launch script
---
 .../org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java    | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
index cd4efef8..11b9783b 100644
--- a/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
+++ b/examples/resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/KafkaUtil.java
@@ -17,6 +17,8 @@
 
 package org.apache.camel.example.resume.strategies.kafka;
 
+import java.time.Duration;
+
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfiguration;
 import org.apache.camel.processor.resume.kafka.KafkaResumeStrategyConfigurationBuilder;
 import org.apache.camel.processor.resume.kafka.SingleNodeKafkaResumeStrategy;
@@ -36,6 +38,7 @@ public final class KafkaUtil {
                         .withBootstrapServers(bootStrapAddress)
                         .withTopic(kafkaTopic)
                         .withProducerProperty("max.block.ms", "10000")
+                        .withMaxInitializationDuration(Duration.ofSeconds(5))
                         .build();
 
         return new SingleNodeKafkaResumeStrategy<>(resumeStrategyConfiguration);