You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/01/13 12:50:09 UTC

camel git commit: Update camel-kafka documentation

Repository: camel
Updated Branches:
  refs/heads/master 7085fd7b2 -> 9584f3851


Update camel-kafka documentation


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9584f385
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9584f385
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9584f385

Branch: refs/heads/master
Commit: 9584f385150e6c25b3b288988eae98c0757583c4
Parents: 7085fd7
Author: Antoine DESSAIGNE <an...@gmail.com>
Authored: Fri Jan 13 13:26:48 2017 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Jan 13 13:49:54 2017 +0100

----------------------------------------------------------------------
 .../src/main/docs/kafka-component.adoc          | 106 ++++++++++---------
 1 file changed, 54 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9584f385/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 52d0aa7..d100a32 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -137,57 +137,65 @@ The Kafka component supports 78 endpoint options which are listed below:
 {% endraw %}
 // endpoint options: END
 
-
-
-
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]
 http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs]
 
-### Samples
+### Message headers
 
-#### Camel 2.17 or newer
+#### Consumer headers
 
-Consuming messages:
+The following headers are available when consuming messages from Kafka.
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|=======================================================================================================
+| Header constant          | Header value      | Type    | Description
+| KafkaConstants.TOPIC     | "kafka.TOPIC"     | String  | The topic from where the message originated
+| KafkaConstants.PARTITION | "kafka.PARTITION" | Integer | The partition where the message was stored
+| KafkaConstants.OFFSET    | "kafka.OFFSET"    | Long    | The offset of the message
+| KafkaConstants.KEY       | "kafka.KEY"       | Object  | The key of the message if configured
+|=======================================================================================================
 
-[source,java]
--------------------------------------------------------------------------------------------------
-from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
-                        .process(new Processor() {
-                            @Override
-                            public void process(Exchange exchange)
-                                    throws Exception {
-                                String messageKey = "";
-                                if (exchange.getIn() != null) {
-                                    Message message = exchange.getIn();
-                                    Integer partitionId = (Integer) message
-                                            .getHeader(KafkaConstants.PARTITION);
-                                    String topicName = (String) message
-                                            .getHeader(KafkaConstants.TOPIC);
-                                    if (message.getHeader(KafkaConstants.KEY) != null)
-                                        messageKey = (String) message
-                                                .getHeader(KafkaConstants.KEY);
-                                    Object data = message.getBody();
-
-
-                                    System.out.println("topicName :: "
-                                            + topicName + " partitionId :: "
-                                            + partitionId + " messageKey :: "
-                                            + messageKey + " message :: "
-                                            + data + "\n");
-                                }
-                            }
-                        }).to("log:input");
--------------------------------------------------------------------------------------------------
+#### Producer headers
+
+Before sending a message to Kafka you can configure the following headers.
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|============================================================================================================================================================================
+| Header constant              | Header value          | Type    | Description
+| KafkaConstants.KEY           | "kafka.KEY"           | Object  | *Required* The key of the message in order to ensure that all related message goes in the same partition
+| KafkaConstants.TOPIC         | "kafka.TOPIC"         | String  | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`)
+| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined)
+|============================================================================================================================================================================
+
+After the message is sent to Kafka, the following headers are available
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|==============================================================================================================================================================================================
+| Header constant                 | Header value                                       | Type                 | Description
+| KafkaConstants.KAFKA_RECORDMETA | "org.apache.kafka.clients.producer.RecordMetadata" | List<RecordMetadata> | The metadata (only configured if `recordMetadata` endpoint parameter is `true`
+|==============================================================================================================================================================================================
 
 
+### Samples
+
+#### Consuming messages from Kafka
+
+Here is the minimal route you need in order to read messages from Kafka.
+[source,java]
+-------------------------------------------------------------
+from("kafka:localhost:9092?topic=test&groupId=testing")
+    .log("Message received from Kafka : ${body}")
+    .log("    on the topic ${headers[kafka.TOPIC]}")
+    .log("    on the partition ${headers[kafka.PARTITION]}")
+    .log("    with the offset ${headers[kafka.OFFSET]}")
+    .log("    with the key ${headers[kafka.KEY]}")
+-------------------------------------------------------------
+
 When consuming messages from Kafka you can use your own offset management and not delegate this management to Kafka.
 In order to keep the offsets the component needs a `StateRepository` implementation such as `FileStateRepository`.
 This bean should be available in the registry.
 Here how to use it :
 [source,java]
--------------------------------------------------------------------------------------------------------------------
+----------------------------------------------------------------------------------------------------------------------------
 // Create the repository in which the Kafka offsets will be persisted
 FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
 
@@ -207,25 +215,19 @@ camelContext.addRoutes(new RouteBuilder() {
                 .to("mock:result");
     }
 });
--------------------------------------------------------------------------------------------------------------------
+----------------------------------------------------------------------------------------------------------------------------
 �
 
-Producing messages:
+#### Producing messages to Kafka
 
+Here is the minimal route you need in order to write messages to Kafka.
 [source,java]
----------------------------------------------------------------------------------------------------------------
-
-from("direct:start").process(new Processor() {
-                    @Override
-                    public void process(Exchange exchange) throws Exception {
-                        exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
-                        exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
-                        exchange.getIn().setHeader(KafkaConstants.KEY, "1");
-                    }
-                }).to("kafka:localhost:9092?topic=test");
----------------------------------------------------------------------------------------------------------------
-
-�
+----------------------------------------------------------------------------
+from("direct:start")
+    .setBody(constant("Message from Camel"))          // Message to send
+    .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
+    .to("kafka:localhost:9092?topic=test");
+----------------------------------------------------------------------------
 
 ### Endpoints
 
@@ -260,4 +262,4 @@ http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/Pollin
 * link:configuring-camel.html[Configuring Camel]
 * link:message-endpoint.html[Message Endpoint] pattern
 * link:uris.html[URIs]
-* link:writing-components.html[Writing Components]
\ No newline at end of file
+* link:writing-components.html[Writing Components]