You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jo...@apache.org on 2012/06/28 01:48:41 UTC

svn commit: r1354776 - in /incubator/kafka/site: design.html downloads.html includes/header.html quickstart.html

Author: joestein
Date: Wed Jun 27 23:48:40 2012
New Revision: 1354776

URL: http://svn.apache.org/viewvc?rev=1354776&view=rev
Log:
KAFKA-375 update the site page for the 0.7.1 release download and related changes

Modified:
    incubator/kafka/site/design.html
    incubator/kafka/site/downloads.html
    incubator/kafka/site/includes/header.html
    incubator/kafka/site/quickstart.html

Modified: incubator/kafka/site/design.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/design.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/design.html (original)
+++ incubator/kafka/site/design.html Wed Jun 27 23:48:40 2012
@@ -330,7 +330,7 @@ The partition API uses the key and the n
 We have 2 levels of consumer APIs. The low-level "simple" API maintains a connection to a single broker and has a close correspondence to the network requests sent to the server. This API is completely stateless, with the offset being passed in on every request, allowing the user to maintain this metadata however they choose.
 </p>
 <p>
-The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed.
+The high-level API hides the details of brokers from the consumer and allows consuming off the cluster of machines without concern for the underlying topology. It also maintains the state of what has been consumed. The high-level API also provides the ability to subscribe to topics that match a filter expression (i.e., either a whitelist or a blacklist regular expression).
 </p>
 
 <h4>Low-level API</h4>
@@ -365,12 +365,21 @@ ConsumerConnector connector = Consumer.c
 interface ConsumerConnector {
 	
   /**
-   * This method is used to get a list of KafkaMessageStreams, which are iterators over topic.
+   * This method is used to get a list of KafkaStreams, which are iterators over
+   * MessageAndMetadata objects from which you can obtain messages and their
+   * associated metadata (currently only topic).
    *  Input: a map of &lt;topic, #streams&gt;
    *  Output: a map of &lt;topic, list of message streams&gt;
-   *          Each message stream supports a message iterator.
    */
-  public Map&lt;String,List&lt;KafkaMessageStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap); 
+  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap); 
+
+  /**
+   * You can also obtain a list of KafkaStreams, that iterate over messages
+   * from topics that match a TopicFilter. (A TopicFilter encapsulates a
+   * whitelist or a blacklist which is a standard Java regex.)
+   */
+  public List&lt;KafkaStream&gt; createMessageStreamsByFilter(
+      TopicFilter topicFilter, int numStreams);
 
   /* Commit the offsets of all messages consumed so far. */
   public commitOffsets()
@@ -380,10 +389,10 @@ interface ConsumerConnector {
 }
 </pre>
 <p>
-This API is centered around iterators, implemented by the KafkaMessageStream class. Each KafkaMessageStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.
+This API is centered around iterators, implemented by the KafkaStream class. Each KafkaStream represents the stream of messages from one or more partitions on one or more servers. Each stream is used for single threaded processing, so the client can provide the number of desired streams in the create call. Thus a stream may represent the merging of multiple server partitions (to correspond to the number of processing threads), but each partition only goes to one stream.
 </p>
 <p>
-The create call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. To minimize this rebalancing the API encourages creating many topic streams in a single call.	
+The createMessageStreams call registers the consumer for the topic, which results in rebalancing the consumer/broker assignment. The API encourages creating many topic streams in a single call in order to minimize this rebalancing. The createMessageStreamsByFilter call (additionally) registers watchers to discover new topics that match its filter. Note that each stream that createMessageStreamsByFilter returns may iterate over messages from multiple topics (i.e., if multiple topics are allowed by the filter).
 </p>
 <h2>Network Layer</h2>
 <p>
@@ -392,6 +401,33 @@ The network layer is a fairly straight-f
 <h2>Messages</h2>
 <p>
 Messages consist of a fixed-size header and variable length opaque byte array payload. The header contains a format version and a CRC32 checksum to detect corruption or truncation. Leaving the payload opaque is the right decision: there is a great deal of progress being made on serialization libraries right now, and any particular choice is unlikely to be right for all uses. Needless to say a particular application using Kafka would likely mandate a particular serialization type as part of its usage. The <code>MessageSet</code> interface is simply an iterator over messages with specialized methods for bulk reading and writing to an NIO <code>Channel</code>.
+
+<h2>Message Format</h2>
+
+<pre>
+	/** 
+	 * A message. The format of an N byte message is the following: 
+	 * 
+	 * If magic byte is 0 
+	 * 
+	 * 1. 1 byte "magic" identifier to allow format changes 
+	 * 
+	 * 2. 4 byte CRC32 of the payload 
+	 * 
+	 * 3. N - 5 byte payload 
+	 * 
+	 * If magic byte is 1 
+	 * 
+	 * 1. 1 byte "magic" identifier to allow format changes 
+	 * 
+	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
+	 * 
+	 * 3. 4 byte CRC32 of the payload 
+	 * 
+	 * 4. N - 6 byte payload 
+	 * 
+	 */
+</pre>
 </p>
 <h2>Log</h2>
 <p>
@@ -545,9 +581,10 @@ When a consumer starts, it does the foll
 <ol>
    <li> Register itself in the consumer id registry under its group.
    </li>
-   <li> Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.
+   <li> Register a watch on changes (new consumers joining or any existing consumers leaving) under the consumer id registry. (Each change triggers rebalancing among all consumers within the group to which the changed consumer belongs.)
    </li>
-   <li> Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry.  Each change triggers rebalancing among all consumers in all consumer groups. </li>
+   <li> Register a watch on changes (new brokers joining or any existing brokers leaving) under the broker id registry. (Each change triggers rebalancing among all consumers in all consumer groups.) </li>
+   <li> If the consumer creates a message stream using a topic filter, it also registers a watch on changes (new topics being added) under the broker topic registry. (Each change will trigger re-evaluation of the available topics to determine which topics are allowed by the topic filter. A new allowed topic will trigger rebalancing among all consumers within the consumer group.)</li>
    <li> Force itself to rebalance within in its consumer group.
    </li>
 </ol>

Modified: incubator/kafka/site/downloads.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/downloads.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/downloads.html (original)
+++ incubator/kafka/site/downloads.html Wed Jun 27 23:48:40 2012
@@ -2,20 +2,29 @@
 
 <h2>Downloads</h2>
 
-The current stable version is 0.7.0-incubating. You can verify your download by following these <a href="http://www.apache.org/info/verification.html">procedures</a> and using these <a href="http://svn.apache.org/repos/asf/incubator/kafka/KEYS">KEYS</a>.
+The current stable version is 0.7.1-incubating. You can verify your download by following these <a href="http://www.apache.org/info/verification.html">procedures</a> and using these <a href="http://svn.apache.org/repos/asf/incubator/kafka/KEYS">KEYS</a>.
 
-<h3>0.7.0 Release</h3>
+<h3>0.7.1 Release</h3>
 <ul>
 	<li>
-		<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/RELEASE-NOTES.html">Release Notes</a>
+		<a href="https://www.apache.org/dyn/closer.cgi/incubator/kafka/kafka-0.7.1-incubating/RELEASE-NOTES.html">Release Notes</a>
 	</li>
  	<li>
-		Download: <a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz">kafka-0.7.0-incubating-src.tar.gz</a> (<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.asc">asc</a>, <a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.md5">md5</a>)
+		Download: <a href="https://www.apache.org/dyn/closer.cgi/incubator/kafka/kafka-0.7.1-incubating/kafka-0.7.1-incubating-src.tgz">kafka-0.7.1-incubating-src.tgz</a> (<a href="https://www.apache.org/dyn/closer.cgi/incubator/kafka/kafka-0.7.1-incubating/kafka-0.7.1-incubating-src.tgz.asc">asc</a>)
 	</li>
 </ul>
 <h3>Older Releases</h3>
+	<h4>0.7.0 Release</h4>
+	<ul>
+		<li>
+			<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/RELEASE-NOTES.html">Release Notes</a>
+		</li>
+	 	<li>
+			Download: <a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz">kafka-0.7.0-incubating-src.tar.gz</a> (<a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.asc">asc</a>, <a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz.md5">md5</a>)
+		</li>
+	</ul>
 <p>
-You can download the previous releases <a href="http://sna-projects.com/kafka/downloads.php">here</a>.
+You can download releases previous to 0.7.0-incubating <a href="http://sna-projects.com/kafka/downloads.php">here</a>.
 </p>
 
 <h3>Disclaimer</h3>

Modified: incubator/kafka/site/includes/header.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/includes/header.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/includes/header.html (original)
+++ incubator/kafka/site/includes/header.html Wed Jun 27 23:48:40 2012
@@ -26,7 +26,7 @@
 			<div class="lsidebar">
 				<ul>
 					<li><a href="downloads.html">download</a></li>
-					<li><a href="http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/docs">api&nbsp;docs</a></li>
+					<li><a href="http://people.apache.org/~joestein/kafka-0.7.1-incubating-docs">api&nbsp;docs</a></li>
 					<li><a href="quickstart.html">quickstart</a></li>
 					<li><a href="design.html">design</a></li>
 					<li><a href="configuration.html">configuration</a></li>

Modified: incubator/kafka/site/quickstart.html
URL: http://svn.apache.org/viewvc/incubator/kafka/site/quickstart.html?rev=1354776&r1=1354775&r2=1354776&view=diff
==============================================================================
--- incubator/kafka/site/quickstart.html (original)
+++ incubator/kafka/site/quickstart.html Wed Jun 27 23:48:40 2012
@@ -245,19 +245,19 @@ ConsumerConfig consumerConfig = new Cons
 ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
 
 // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
-Map&lt;String, List&lt;KafkaMessageStream&lt;Message&gt;&gt;&gt; topicMessageStreams = 
+Map&lt;String, List&lt;KafkaStream&lt;Message&gt;&gt;&gt; topicMessageStreams = 
     consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
-List&lt;KafkaMessageStream&lt;Message&gt;&gt; streams = topicMessageStreams.get("test");
+List&lt;KafkaStream&lt;Message&gt;&gt; streams = topicMessageStreams.get("test");
 
 // create list of 4 threads to consume from each of the partitions 
 ExecutorService executor = Executors.newFixedThreadPool(4);
 
 // consume the messages in the threads
-for(final KafkaMessageStream&lt;Message&gt; stream: streams) {
+for(final KafkaStream&lt;Message&gt; stream: streams) {
   executor.submit(new Runnable() {
     public void run() {
-      for(Message message: stream) {
-        // process message
+      for(MessageAndMetadata msgAndMetadata: stream) {
+        // process message (msgAndMetadata.message())
       }	
     }
   });