You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/29 01:50:08 UTC
svn commit: r1476865 [2/2] - in /kafka/site: ./ 07/ 07/images/ 07/includes/
images/ includes/
Added: kafka/site/07/quickstart.html
URL: http://svn.apache.org/viewvc/kafka/site/07/quickstart.html?rev=1476865&view=auto
==============================================================================
--- kafka/site/07/quickstart.html (added)
+++ kafka/site/07/quickstart.html Sun Apr 28 23:50:07 2013
@@ -0,0 +1,310 @@
+<!--#include virtual="includes/header.html" -->
+
+<h2>Quick Start</h3>
+
+<h3> Step 1: Download the code </h3>
+
+<a href="downloads.html" title="Kafka downloads">Download</a> a recent stable release.
+
+<pre>
+<b>> tar xzf kafka-<VERSION>.tgz</b>
+<b>> cd kafka-<VERSION></b>
+<b>> ./sbt update</b>
+<b>> ./sbt package</b>
+</pre>
+
+<h3>Step 2: Start the server</h3>
+
+Kafka brokers and consumers use this for co-ordination.
+<p>
+First start the zookeeper server. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node zookeeper instance.
+
+<pre>
+<b>> bin/zookeeper-server-start.sh config/zookeeper.properties</b>
+[2010-11-21 23:45:02,335] INFO Reading configuration from: config/zookeeper.properties
+...
+</pre>
+
+Now start the Kafka server:
+<pre>
+<b>> bin/kafka-server-start.sh config/server.properties</b>
+jkreps-mn-2:kafka-trunk jkreps$ bin/kafka-server-start.sh config/server.properties
+[2010-11-21 23:51:39,608] INFO starting log cleaner every 60000 ms (kafka.log.LogManager)
+[2010-11-21 23:51:39,628] INFO connecting to ZK: localhost:2181 (kafka.server.KafkaZooKeeper)
+...
+</pre>
+
+<h3>Step 3: Send some messages</h3>
+
+Kafka comes with a command line client that will take input from standard in and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message. The topic <i>test</i> is created automatically when messages are sent to it. Omitting logging you should see something like this:
+
+<pre>
+> <b>bin/kafka-console-producer.sh --zookeeper localhost:2181 --topic test</b>
+This is a message
+This is another message
+</pre>
+
+<h3>Step 4: Start a consumer</h3>
+
+Kafka also has a command line consumer that will dump out messages to standard out.
+
+<pre>
+<b>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning</b>
+This is a message
+This is another message
+</pre>
+<p>
+If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.
+</p>
+<p>
+Both of these command line tools have additional options. Running the command with no arguments will display usage information documenting them in more detail.
+</p>
+
+<h3>Step 5: Write some code</h3>
+
+Below is some very simple examples of using Kafka for sending messages, more complete examples can be found in the Kafka source code in the examples/ directory.
+
+<h4>Producer Code</h4>
+
+<h5>Producer API </h5>
+
+Here are examples of using the producer API - <code>kafka.producer.Producer<T></code> -
+
+<ol>
+<li>First, start a local instance of the zookeeper server
+<pre>./bin/zookeeper-server-start.sh config/zookeeper.properties</pre>
+</li>
+<li>Next, start a kafka broker
+<pre>./bin/kafka-server-start.sh config/server.properties</pre>
+</li>
+<li>Now, create the producer with all configuration defaults and use zookeeper based broker discovery.
+<pre>
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import kafka.javaapi.producer.SyncProducer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.producer.SyncProducerConfig;
+
+...
+
+Properties props = new Properties();
+props.put(âzk.connectâ, â127.0.0.1:2181â);
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+ProducerConfig config = new ProducerConfig(props);
+Producer<String, String> producer = new Producer<String, String>(config);
+</pre>
+</li>
+<li>Send a single message
+<pre>
+<small>// The message is sent to a randomly selected partition registered in ZK</small>
+ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");
+producer.send(data);
+</pre>
+</li>
+<li>Send multiple messages to multiple topics in one request
+<pre>
+List<String> messages = new java.util.ArrayList<String>();
+messages.add("test-message1");
+messages.add("test-message2");
+ProducerData<String, String> data1 = new ProducerData<String, String>("test-topic1", messages);
+ProducerData<String, String> data2 = new ProducerData<String, String>("test-topic2", messages);
+List<ProducerData<String, String>> dataForMultipleTopics = new ArrayList<ProducerData<String, String>>();
+dataForMultipleTopics.add(data1);
+dataForMultipleTopics.add(data2);
+producer.send(dataForMultipleTopics);
+</pre>
+</li>
+<li>Send a message with a partition key. Messages with the same key are sent to the same partition
+<pre>
+ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message");
+producer.send(data);
+</pre>
+</li>
+<li>Use your custom partitioner
+<p>If you are using zookeeper based broker discovery, <code>kafka.producer.Producer<T></code> routes your data to a particular broker partition based on a <code>kafka.producer.Partitioner<T></code>, specified through the <code>partitioner.class</code> config parameter. It defaults to <code>kafka.producer.DefaultPartitioner</code>. If you don't supply a partition key, then it sends each request to a random broker partition.</p>
+<pre>
+class MemberIdPartitioner extends Partitioner[MemberIdLocation] {
+ def partition(data: MemberIdLocation, numPartitions: Int): Int = {
+ (data.location.hashCode % numPartitions)
+ }
+}
+<small>// create the producer config to plug in the above partitioner</small>
+Properties props = new Properties();
+props.put(âzk.connectâ, â127.0.0.1:2181â);
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+props.put("partitioner.class", "xyz.MemberIdPartitioner");
+ProducerConfig config = new ProducerConfig(props);
+Producer<String, String> producer = new Producer<String, String>(config);
+</pre>
+</li>
+<li>Use custom Encoder
+<p>The producer takes in a required config parameter <code>serializer.class</code> that specifies an <code>Encoder<T></code> to convert T to a Kafka Message. Default is the no-op kafka.serializer.DefaultEncoder.
+Here is an example of a custom Encoder -</p>
+<pre>
+class TrackingDataSerializer extends Encoder<TrackingData> {
+ <small>// Say you want to use your own custom Avro encoding</small>
+ CustomAvroEncoder avroEncoder = new CustomAvroEncoder();
+ def toMessage(event: TrackingData):Message = {
+ new Message(avroEncoder.getBytes(event));
+ }
+}
+</pre>
+If you want to use the above Encoder, pass it in to the "serializer.class" config parameter
+<pre>
+Properties props = new Properties();
+props.put("serializer.class", "xyz.TrackingDataSerializer");
+</pre>
+</li>
+<li>Using static list of brokers, instead of zookeeper based broker discovery
+<p>Some applications would rather not depend on zookeeper. In that case, the config parameter <code>broker.list</code>
+can be used to specify the list of all brokers in the Kafka cluster.- the list of all brokers in your Kafka cluster in the following format -
+<code>broker_id1:host1:port1, broker_id2:host2:port2...</code></p>
+<pre>
+<small>// you can stop the zookeeper instance as it is no longer required</small>
+./bin/zookeeper-server-stop.sh
+<small>// create the producer config object </small>
+Properties props = new Properties();
+props.put(âbroker.listâ, â0:localhost:9092â);
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+ProducerConfig config = new ProducerConfig(props);
+<small>// send a message using default partitioner </small>
+Producer<String, String> producer = new Producer<String, String>(config);
+List<String> messages = new java.util.ArrayList<String>();
+messages.add("test-message");
+ProducerData<String, String> data = new ProducerData<String, String>("test-topic", messages);
+producer.send(data);
+</pre>
+</li>
+<li>Use the asynchronous producer along with GZIP compression. This buffers writes in memory until either <code>batch.size</code> or <code>queue.time</code> is reached. After that, data is sent to the Kafka brokers
+<pre>
+Properties props = new Properties();
+props.put("zk.connect"â "127.0.0.1:2181");
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+props.put("producer.type", "async");
+props.put("compression.codec", "1");
+ProducerConfig config = new ProducerConfig(props);
+Producer<String, String> producer = new Producer<String, String>(config);
+ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message");
+producer.send(data);
+</pre
+</li>
+<li>Finally, the producer should be closed, through
+<pre>producer.close();</pre>
+</li>
+</ol>
+
+<h5>Log4j appender </h5>
+
+Data can also be produced to a Kafka server in the form of a log4j appender. In this way, minimal code needs to be written in order to send some data across to the Kafka server.
+Here is an example of how to use the Kafka Log4j appender -
+
+Start by defining the Kafka appender in your log4j.properties file.
+<pre>
+<small>// define the kafka log4j appender config parameters</small>
+log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
+<small>// <b>REQUIRED</b>: set the hostname of the kafka server</small>
+log4j.appender.KAFKA.Host=localhost
+<small>// <b>REQUIRED</b>: set the port on which the Kafka server is listening for connections</small>
+log4j.appender.KAFKA.Port=9092
+<small>// <b>REQUIRED</b>: the topic under which the logger messages are to be posted</small>
+log4j.appender.KAFKA.Topic=test
+<small>// the serializer to be used to turn an object into a Kafka message. Defaults to kafka.producer.DefaultStringEncoder</small>
+log4j.appender.KAFKA.Serializer=kafka.test.AppenderStringSerializer
+<small>// do not set the above KAFKA appender as the root appender</small>
+log4j.rootLogger=INFO
+<small>// set the logger for your package to be the KAFKA appender</small>
+log4j.logger.your.test.package=INFO, KAFKA
+</pre>
+
+Data can be sent using a log4j appender as follows -
+
+<pre>
+Logger logger = Logger.getLogger([your.test.class])
+logger.info("message from log4j appender");
+</pre>
+
+If your log4j appender fails to send messages, please verify that the correct
+log4j properties file is being used. You can add
+<code>-Dlog4j.debug=true</code> to your VM parameters to verify this.
+
+<h4>Consumer Code</h4>
+
+The consumer code is slightly more complex as it enables multithreaded consumption:
+
+<pre>
+// specify some consumer properties
+Properties props = new Properties();
+props.put("zk.connect", "localhost:2181");
+props.put("zk.connectiontimeout.ms", "1000000");
+props.put("groupid", "test_group");
+
+// Create the connection to the cluster
+ConsumerConfig consumerConfig = new ConsumerConfig(props);
+ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+
+// create 4 partitions of the stream for topic âtestâ, to allow 4 threads to consume
+Map<String, List<KafkaStream<Message>>> topicMessageStreams =
+ consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
+List<KafkaStream<Message>> streams = topicMessageStreams.get("test");
+
+// create list of 4 threads to consume from each of the partitions
+ExecutorService executor = Executors.newFixedThreadPool(4);
+
+// consume the messages in the threads
+for(final KafkaStream<Message> stream: streams) {
+ executor.submit(new Runnable() {
+ public void run() {
+ for(MessageAndMetadata msgAndMetadata: stream) {
+ // process message (msgAndMetadata.message())
+ }
+ }
+ });
+}
+</pre>
+
+<h4>Hadoop Consumer</h4>
+
+<p>
+Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).
+</p>
+
+<p>
+Usage information on the hadoop consumer can be found <a href="https://github.com/kafka-dev/kafka/tree/master/contrib/hadoop-consumer">here</a>.
+</p>
+
+<h4>Simple Consumer</h4>
+
+Kafka has a lower-level consumer api for reading message chunks directly from servers. Under most circumstances this should not be needed. But just in case, it's usage is as follows:
+
+<pre>
+import kafka.api.FetchRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageSet;
+import kafka.utils.Utils;
+
+...
+
+<small>// create a consumer to connect to the kafka server running on localhost, port 9092, socket timeout of 10 secs, socket receive buffer of ~1MB</small>
+SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
+
+long offset = 0;
+while (true) {
+ <small>// create a fetch request for topic âtestâ, partition 0, current offset, and fetch size of 1MB</small>
+ FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
+
+ <small>// get the message set from the consumer and print them out</small>
+ ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
+ for(MessageAndOffset msg : messages) {
+ System.out.println("consumed: " + Utils.toString(msg.message.payload(), "UTF-8"));
+ <small>// advance the offset after consuming each message</small>
+ offset = msg.offset;
+ }
+}
+</pre>
+
+<!--#include virtual="includes/footer.html" -->
+
Modified: kafka/site/includes/header.html
URL: http://svn.apache.org/viewvc/kafka/site/includes/header.html?rev=1476865&r1=1476864&r2=1476865&view=diff
==============================================================================
--- kafka/site/includes/header.html (original)
+++ kafka/site/includes/header.html Sun Apr 28 23:50:07 2013
@@ -26,12 +26,10 @@
<div class="lsidebar">
<ul>
<li><a href="downloads.html">download</a></li>
- <li><a href="http://people.apache.org/~joestein/kafka-0.7.1-incubating-docs">api docs</a></li>
- <li><a href="quickstart.html">quickstart</a></li>
- <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">clients</a></li>
- <li><a href="design.html">design</a></li>
- <li><a href="configuration.html">configuration</a></li>
- <li><a href="performance.html">performance</a></li>
+ <li>releases</li>
+ <ul>
+ <li><a href="07/quickstart.html">0.7</a></li>
+ </ul>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA/Operations">operations</a></li>
<li><a href="faq.html">faq</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/KAFKA">wiki</li>
Modified: kafka/site/index.html
URL: http://svn.apache.org/viewvc/kafka/site/index.html?rev=1476865&r1=1476864&r2=1476865&view=diff
==============================================================================
--- kafka/site/index.html (original)
+++ kafka/site/index.html Sun Apr 28 23:50:07 2013
@@ -14,7 +14,7 @@ Kafka provides a publish-subscribe solut
</p>
<p>
-The use for activity stream processing makes Kafka comparable to <a href="https://github.com/facebook/scribe">Facebook's Scribe</a> or <a href="http://flume.apache.org">Apache Flume</a> (incubating), though the architecture and primitives are very different for these systems and make Kafka more comparable to a traditional messaging system. See our <a href="design.html">design</a> page for more details.
+The use for activity stream processing makes Kafka comparable to <a href="https://github.com/facebook/scribe">Facebook's Scribe</a> or <a href="http://flume.apache.org">Apache Flume</a> (incubating), though the architecture and primitives are very different for these systems and make Kafka more comparable to a traditional messaging system.
</p>
<!--#include virtual="includes/footer.html" -->
Modified: kafka/site/projects.html
URL: http://svn.apache.org/viewvc/kafka/site/projects.html?rev=1476865&r1=1476864&r2=1476865&view=diff
==============================================================================
--- kafka/site/projects.html (original)
+++ kafka/site/projects.html Sun Apr 28 23:50:07 2013
@@ -35,7 +35,7 @@ Below is a list of projects which would
<h3>Clients In Other Languages</h3>
<p>
-We offer a JVM-based client for production and consumption and also a rather primitive native python client. It would be great to improve this list. The lower-level protocols are well documented <a href="design.html">here</a> and should be relatively easy to implement in any language that supports standard socket I/O.
+We offer a JVM-based client for production and consumption. It would be great to implement the client in other languages.
</p>
<h3>Convert Hadoop InputFormat or OutputFormat to Scala</h3>