You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/13 18:37:14 UTC

camel git commit: Updated camel-kafka docs for KafkaIdempotentRepository. Expanded on Javadocs.

Repository: camel
Updated Branches:
  refs/heads/master d4d574fd1 -> d6b391d65


Updated camel-kafka docs for KafkaIdempotentRepository. Expanded on Javadocs.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d6b391d6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d6b391d6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d6b391d6

Branch: refs/heads/master
Commit: d6b391d65728c81316260a012a271f1a54e63f3e
Parents: d4d574f
Author: Jakub Korab <ja...@gmail.com>
Authored: Mon Mar 13 15:11:07 2017 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Mar 13 19:37:08 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 72 ++++++++++++++++++++
 .../kafka/KafkaIdempotentRepository.java        |  5 ++
 2 files changed, 77 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d6b391d6/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index d559d77..075ea6d 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -294,6 +294,78 @@ camelContext.addRoutes(new RouteBuilder() {
 });
 --------------------------------------------------------------------------------------------------
 
+### Using the Kafka idempotent repository (Available from Camel 2.19)
+
+The `camel-kafka` library provides a Kafka topic-based idempotent repository. This repository stores broadcasts all changes to idempotent state (add/remove) in a Kafka topic, and populates a local in-memory cache for each repository's process instance through event sourcing.
+
+The topic used must be unique per idempotent repository instance. The mechanism does not have any requirements about the number of topic partitions; as the repository consumes from all partitions at the same time. It also does not have any requirements about the replication factor of the topic.
+
+Each repository instance that uses the topic (e.g. typically on different machines running in parallel) controls its own consumer group, so in a cluster of 10 Camel processes using the same topic each will control its own offset.
+
+On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the latest state. The cache will not be considered warmed up until one poll of `pollDurationMs` in length returns 0 records. Startup will not be completed until either the cache has warmed up, or 30 seconds go by; if the latter happens the idempotent repository may be in an inconsistent state until its consumer catches up to the end of the topic.
+
+A `KafkaIdempotentRepository` has the following properties:
+[width="100%",cols="2m,5",options="header"]
+|=======================================================================
+| Property | Description
+| topic | The name of the Kafka topic to use to broadcast changes. (required)
+| bootstrapServers | The `bootstrap.servers` property on the internal Kafka producer and consumer. Use this as shorthand if not setting `consumerConfig` and `producerConfig`. If used, this component will apply sensible default configurations for the producer and consumer.
+| producerConfig | Sets the properties that will be used by the Kafka producer that broadcasts changes. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself
+| consumerConfig | Sets the properties that will be used by the Kafka consumer that populates the cache from the topic. Overrides `bootstrapServers`, so must define the Kafka `bootstrap.servers` property itself
+| maxCacheSize | How many of the most recently used keys should be stored in memory (default 1000).
+| pollDurationMs | The poll duration of the Kafka consumer. The local caches are updated immediately. This value will affect how far behind other peers that update their caches from the topic are relative to the idempotent consumer instance that sent the cache action message. The default value of this is 100 ms. +
+If setting this value explicitly, be aware that there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's consumer and the Kafka brokers. The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the stream has been consumed up to the current point. If the poll duration is excessively long for the rate at which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will operate in an inconsistent state relative to its peers until it catches up.
+|=======================================================================
+
+The repository can be instantiated by defining the `topic` and `bootstrapServers`, or the `producerConfig` and `consumerConfig` property sets can be explicitly defined to enable features such as SSL/SASL.
+
+To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in Spring/Blueprint, as it is `CamelContext` aware.
+
+Sample usage is as follows:
+
+[source,java]
+-------------------------------------------------------------
+KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
+
+SimpleRegistry registry = new SimpleRegistry();
+registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext
+CamelContext context = new CamelContext(registry);
+
+
+// later in RouteBuilder...
+from("direct:performInsert")
+    .idempotentConsumer(header("id")).messageIdRepositoryRef("insertDbIdemRepo")
+        // once-only insert into database
+    .end()
+-------------------------------------------------------------
+
+In XML:
+
+[source,xml]
+-------------------------------------------------------------
+<!-- simple -->
+<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
+  <property name="topic" value="idempotent-db-inserts"/>
+  <property name="bootstrapServers" value="localhost:9091"/>
+</bean>
+
+<!-- complex -->
+<bean id="insertDbIdemRepo" class="org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository">
+  <property name="topic" value="idempotent-db-inserts"/>
+  <property name="maxCacheSize" value="10000"/>
+  <property name="consumerConfig">
+    <props>
+      <prop key="bootstrap.servers">localhost:9091</prop>
+    </props>
+  </property>
+  <property name="producerConfig">
+    <props>
+      <prop key="bootstrap.servers">localhost:9091</prop>
+    </props>
+  </property>
+</bean>
+-------------------------------------------------------------
+
 
 ### Endpoints
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d6b391d6/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 4a5b98c..dbc1474 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -223,6 +223,11 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
      * The default value of this is {@link #DEFAULT_POLL_DURATION_MS}. If setting this value explicitly, be aware that
      * there is a tradeoff between the remote cache liveness and the volume of network traffic between this repository's
      * consumer and the Kafka brokers.
+     *
+     * The cache warmup process also depends on there being one poll that fetches nothing - this indicates that the
+     * stream has been consumed up to the current point. If the poll duration is excessively long for the rate at
+     * which messages are sent on the topic, there exists a possibility that the cache cannot be warmed up and will
+     * operate in an inconsistent state relative to its peers until it catches up.
      * @param pollDurationMs The poll duration in milliseconds.
      */
     public void setPollDurationMs(int pollDurationMs) {