You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/01/13 12:50:09 UTC
camel git commit: Update camel-kafka documentation
Repository: camel
Updated Branches:
refs/heads/master 7085fd7b2 -> 9584f3851
Update camel-kafka documentation
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9584f385
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9584f385
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9584f385
Branch: refs/heads/master
Commit: 9584f385150e6c25b3b288988eae98c0757583c4
Parents: 7085fd7
Author: Antoine DESSAIGNE <an...@gmail.com>
Authored: Fri Jan 13 13:26:48 2017 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Fri Jan 13 13:49:54 2017 +0100
----------------------------------------------------------------------
.../src/main/docs/kafka-component.adoc | 106 ++++++++++---------
1 file changed, 54 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9584f385/components/camel-kafka/src/main/docs/kafka-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc
index 52d0aa7..d100a32 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -137,57 +137,65 @@ The Kafka component supports 78 endpoint options which are listed below:
{% endraw %}
// endpoint options: END
-
-
-
For more information about Producer/Consumer configuration:
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]
http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs]
-### Samples
+### Message headers
-#### Camel 2.17 or newer
+#### Consumer headers
-Consuming messages:
+The following headers are available when consuming messages from Kafka.
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|=======================================================================================================
+| Header constant | Header value | Type | Description
+| KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic from where the message originated
+| KafkaConstants.PARTITION | "kafka.PARTITION" | Integer | The partition where the message was stored
+| KafkaConstants.OFFSET | "kafka.OFFSET" | Long | The offset of the message
+| KafkaConstants.KEY | "kafka.KEY" | Object | The key of the message if configured
+|=======================================================================================================
-[source,java]
--------------------------------------------------------------------------------------------------
-from("kafka:localhost:9092?topic=test&groupId=testing&autoOffsetReset=earliest&consumersCount=1")
- .process(new Processor() {
- @Override
- public void process(Exchange exchange)
- throws Exception {
- String messageKey = "";
- if (exchange.getIn() != null) {
- Message message = exchange.getIn();
- Integer partitionId = (Integer) message
- .getHeader(KafkaConstants.PARTITION);
- String topicName = (String) message
- .getHeader(KafkaConstants.TOPIC);
- if (message.getHeader(KafkaConstants.KEY) != null)
- messageKey = (String) message
- .getHeader(KafkaConstants.KEY);
- Object data = message.getBody();
-
-
- System.out.println("topicName :: "
- + topicName + " partitionId :: "
- + partitionId + " messageKey :: "
- + messageKey + " message :: "
- + data + "\n");
- }
- }
- }).to("log:input");
--------------------------------------------------------------------------------------------------
+#### Producer headers
+
+Before sending a message to Kafka you can configure the following headers.
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|============================================================================================================================================================================
+| Header constant | Header value | Type | Description
+| KafkaConstants.KEY | "kafka.KEY" | Object | *Required* The key of the message in order to ensure that all related message goes in the same partition
+| KafkaConstants.TOPIC | "kafka.TOPIC" | String | The topic to which send the message (only read if the `bridgeEndpoint` endpoint parameter is `true`)
+| KafkaConstants.PARTITION_KEY | "kafka.PARTITION_KEY" | Integer | Explicitly specify the partition (only used if the `KafkaConstants.KEY` header is defined)
+|============================================================================================================================================================================
+
+After the message is sent to Kafka, the following headers are available
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|==============================================================================================================================================================================================
+| Header constant | Header value | Type | Description
+| KafkaConstants.KAFKA_RECORDMETA | "org.apache.kafka.clients.producer.RecordMetadata" | List<RecordMetadata> | The metadata (only configured if `recordMetadata` endpoint parameter is `true`
+|==============================================================================================================================================================================================
+### Samples
+
+#### Consuming messages from Kafka
+
+Here is the minimal route you need in order to read messages from Kafka.
+[source,java]
+-------------------------------------------------------------
+from("kafka:localhost:9092?topic=test&groupId=testing")
+ .log("Message received from Kafka : ${body}")
+ .log(" on the topic ${headers[kafka.TOPIC]}")
+ .log(" on the partition ${headers[kafka.PARTITION]}")
+ .log(" with the offset ${headers[kafka.OFFSET]}")
+ .log(" with the key ${headers[kafka.KEY]}")
+-------------------------------------------------------------
+
When consuming messages from Kafka you can use your own offset management and not delegate this management to Kafka.
In order to keep the offsets the component needs a `StateRepository` implementation such as `FileStateRepository`.
This bean should be available in the registry.
Here how to use it :
[source,java]
--------------------------------------------------------------------------------------------------------------------
+----------------------------------------------------------------------------------------------------------------------------
// Create the repository in which the Kafka offsets will be persisted
FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat"));
@@ -207,25 +215,19 @@ camelContext.addRoutes(new RouteBuilder() {
.to("mock:result");
}
});
--------------------------------------------------------------------------------------------------------------------
+----------------------------------------------------------------------------------------------------------------------------
�
-Producing messages:
+#### Producing messages to Kafka
+Here is the minimal route you need in order to write messages to Kafka.
[source,java]
----------------------------------------------------------------------------------------------------------------
-
-from("direct:start").process(new Processor() {
- @Override
- public void process(Exchange exchange) throws Exception {
- exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class);
- exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0);
- exchange.getIn().setHeader(KafkaConstants.KEY, "1");
- }
- }).to("kafka:localhost:9092?topic=test");
----------------------------------------------------------------------------------------------------------------
-
-�
+----------------------------------------------------------------------------
+from("direct:start")
+ .setBody(constant("Message from Camel")) // Message to send
+ .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
+ .to("kafka:localhost:9092?topic=test");
+----------------------------------------------------------------------------
### Endpoints
@@ -260,4 +262,4 @@ http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/Pollin
* link:configuring-camel.html[Configuring Camel]
* link:message-endpoint.html[Message Endpoint] pattern
* link:uris.html[URIs]
-* link:writing-components.html[Writing Components]
\ No newline at end of file
+* link:writing-components.html[Writing Components]