You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/11/10 02:09:41 UTC

[1/3] kafka-site git commit: add 0.9.0 docs

Repository: kafka-site
Updated Branches:
  refs/heads/asf-site e4d98494e -> 8c4a140cf


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/ops.html
----------------------------------------------------------------------
diff --git a/090/ops.html b/090/ops.html
index b539b5c..93640b0 100644
--- a/090/ops.html
+++ b/090/ops.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 Here is some information on actually running Kafka as a production system based on usage and experience at LinkedIn. Please send us any additional tips you know of.
 
 <h3><a id="basic_ops">6.1 Basic Kafka Operations</a></h3>
@@ -79,9 +96,9 @@ Since running this command can be tedious you can also configure Kafka to do thi
 
 <h4><a id="basic_ops_mirror_maker">Mirroring data between clusters</a></h4>
 
-We refer to the process of replicating data <i>between</i> Kafka clusters "mirroring" to avoid confusion with the replication that happens amongst the nodes in a single cluster. Kafka comes with a tool for mirroring data between Kafka clusters. The tool reads from one or more source clusters and writes to a destination cluster, like this:
+We refer to the process of replicating data <i>between</i> Kafka clusters "mirroring" to avoid confusion with the replication that happens amongst the nodes in a single cluster. Kafka comes with a tool for mirroring data between Kafka clusters. The tool reads from a source cluster and writes to a destination cluster, like this:
 <p>
-<img src="/images/mirror-maker.png">
+<img src="images/mirror-maker.png">
 <p>
 A common use case for this kind of mirroring is to provide a replica in another datacenter. This scenario will be discussed in more detail in the next section.
 <p>
@@ -280,6 +297,25 @@ Topic:foo	PartitionCount:1	ReplicationFactor:3	Configs:
 	Topic: foo	Partition: 0	Leader: 5	Replicas: 5,6,7	Isr: 5,6,7
 </pre>
 
+<h4><a id="quotas">Setting quotas</a></h4>
+It is possible to set default quotas that apply to all client-ids by setting these configs on the brokers. By default, each client-id receives an unlimited quota. The following sets the default quota per producer and consumer client-id to 10MB/sec.
+<pre>
+  quota.producer.default=10485760
+  quota.consumer.default=10485760
+</pre>
+
+It is also possible to set custom quotas for each client.
+<pre>
+> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name clientA --entity-type clients
+Updated config for clientId: "clientA".
+</pre>
+
+Here's how to describe the quota for a given client.
+<pre>
+> ./kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-name clientA --entity-type clients
+Configs for clients:clientA are producer_byte_rate=1024,consumer_byte_rate=2048
+</pre>
+
 <h3><a id="datacenters">6.2 Datacenters</a></h3>
 
 Some deployments will need to manage a data pipeline that spans multiple datacenters. Our recommended approach to this is to deploy a local Kafka cluster in each datacenter with application instances in each datacenter interacting only with their local cluster and mirroring between clusters (see the documentation on the <a href="#basic_ops_mirror_maker">mirror maker tool</a> for how to do this).
@@ -584,6 +620,12 @@ We pay particular we do graphing and alerting on the following metrics:
       <td>kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent</td>
       <td>between 0 and 1, ideally &gt 0.3</td>
     </tr>
+    <tr>
+      <td>Quota metrics per client-id</td>
+      <td>kafka.server:type={Produce|Fetch},client-id==([-.\w]+)</td>
+      <td>Two attributes. throttle-time indicates the amount of time in ms the client-id was throttled. Ideally = 0.
+          byte-rate indicates the data produce/consume rate of the client in bytes/sec.</td>
+    </tr>
 </tbody></table>
 
 <h4><a id="new_producer_monitoring">New producer monitoring</a></h4>
@@ -831,6 +873,16 @@ The following metrics are available on new producer instances.
       <td>The average per-second number of record sends that resulted in errors for a topic.</td>
       <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+),topic=([-.\w]+)</td>
     </tr>
+    <tr>
+      <td>produce-throttle-time-max</td>
+      <td>The maximum time in ms a request was throttled by a broker.</td>
+      <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+)</td>
+    </tr>
+    <tr>
+      <td>produce-throttle-time-avg</td>
+      <td>The average time in ms a request was throttled by a broker.</td>
+      <td>kafka.producer:type=producer-topic-metrics,client-id=([-.\w]+)</td>
+    </tr>
 </tbody></table>
 
 We recommend monitor GC time and other stats and various server stats such as CPU utilization, I/O service time, etc.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/producer_config.html
----------------------------------------------------------------------
diff --git a/090/producer_config.html b/090/producer_config.html
new file mode 100644
index 0000000..ed9ce9e
--- /dev/null
+++ b/090/producer_config.html
@@ -0,0 +1,106 @@
+<table>
+<tr>
+<th>Name</th>
+<th>Type</th>
+<th>Default</th>
+<th>Valid Values</th>
+<th>Importance</th>
+<th>Description</th>
+</tr>
+<tr>
+<td>bootstrap.servers</td><td>list</td><td></td><td></td><td>high</td><td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).</td></tr>
+<tr>
+<td>key.serializer</td><td>class</td><td></td><td></td><td>high</td><td>Serializer class for key that implements the <code>Serializer</code> interface.</td></tr>
+<tr>
+<td>value.serializer</td><td>class</td><td></td><td></td><td>high</td><td>Serializer class for value that implements the <code>Serializer</code> interface.</td></tr>
+<tr>
+<td>acks</td><td>string</td><td>1</td><td>[all, -1, 0, 1]</td><td>high</td><td>The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the  durability of records that are sent. The following settings are common:  <ul> <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <code>retries</code> configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record b
 ut before the followers have replicated it then the record will be lost. <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.</td></tr>
+<tr>
+<td>buffer.memory</td><td>long</td><td>33554432</td><td>[0,...]</td><td>high</td><td>The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by <code>block.on.buffer.full</code>. <p>This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.</td></tr>
+<tr>
+<td>compression.type</td><td>string</td><td>none</td><td></td><td>high</td><td>The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid  values are <code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).</td></tr>
+<tr>
+<td>retries</td><td>int</td><td>0</td><td>[0,...,2147483647]</td><td>high</td><td>Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.</td></tr>
+<tr>
+<td>ssl.key.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The password of the private key in the key store file. This is optional for client.</td></tr>
+<tr>
+<td>ssl.keystore.location</td><td>string</td><td>null</td><td></td><td>high</td><td>The location of the key store file. This is optional for client and can be used for two-way authentication for client.</td></tr>
+<tr>
+<td>ssl.keystore.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. </td></tr>
+<tr>
+<td>ssl.truststore.location</td><td>string</td><td>null</td><td></td><td>high</td><td>The location of the trust store file. </td></tr>
+<tr>
+<td>ssl.truststore.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The password for the trust store file. </td></tr>
+<tr>
+<td>batch.size</td><td>int</td><td>16384</td><td>[0,...]</td><td>medium</td><td>The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. <p>No attempt will be made to batch records larger than this size. <p>Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. <p>A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.</td></tr>
+<tr>
+<td>client.id</td><td>string</td><td>""</td><td></td><td>medium</td><td>An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.</td></tr>
+<tr>
+<td>connections.max.idle.ms</td><td>long</td><td>540000</td><td></td><td>medium</td><td>Close idle connections after the number of milliseconds specified by this config.</td></tr>
+<tr>
+<td>linger.ms</td><td>long</td><td>0</td><td>[0,...]</td><td>medium</td><td>The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay&mdash;that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get <code>batch.size</code> worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'ling
 er' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>linger.ms=5</code>, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.</td></tr>
+<tr>
+<td>max.block.ms</td><td>long</td><td>60000</td><td>[0,...]</td><td>medium</td><td>The configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block.These methods can be blocked for multiple reasons. For e.g: buffer full, metadata unavailable.This configuration imposes maximum limit on the total time spent in fetching metadata, serialization of key and value, partitioning and allocation of buffer memory when doing a send(). In case of partitionsFor(), this configuration imposes a maximum time threshold on waiting for metadata</td></tr>
+<tr>
+<td>max.request.size</td><td>int</td><td>1048576</td><td>[0,...]</td><td>medium</td><td>The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.</td></tr>
+<tr>
+<td>partitioner.class</td><td>class</td><td>class org.apache.kafka.clients.producer.internals.DefaultPartitioner</td><td></td><td>medium</td><td>Partitioner class that implements the <code>Partitioner</code> interface.</td></tr>
+<tr>
+<td>receive.buffer.bytes</td><td>int</td><td>32768</td><td>[0,...]</td><td>medium</td><td>The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.</td></tr>
+<tr>
+<td>request.timeout.ms</td><td>int</td><td>30000</td><td>[0,...]</td><td>medium</td><td>The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.</td></tr>
+<tr>
+<td>sasl.kerberos.service.name</td><td>string</td><td>null</td><td></td><td>medium</td><td>The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.</td></tr>
+<tr>
+<td>security.protocol</td><td>string</td><td>PLAINTEXT</td><td></td><td>medium</td><td>Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.</td></tr>
+<tr>
+<td>send.buffer.bytes</td><td>int</td><td>131072</td><td>[0,...]</td><td>medium</td><td>The size of the TCP send buffer (SO_SNDBUF) to use when sending data.</td></tr>
+<tr>
+<td>ssl.enabled.protocols</td><td>list</td><td>[TLSv1.2, TLSv1.1, TLSv1]</td><td></td><td>medium</td><td>The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.</td></tr>
+<tr>
+<td>ssl.keystore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the key store file. This is optional for client. Default value is JKS</td></tr>
+<tr>
+<td>ssl.protocol</td><td>string</td><td>TLS</td><td></td><td>medium</td><td>The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.</td></tr>
+<tr>
+<td>ssl.provider</td><td>string</td><td>null</td><td></td><td>medium</td><td>The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</td></tr>
+<tr>
+<td>ssl.truststore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the trust store file. Default value is JKS.</td></tr>
+<tr>
+<td>timeout.ms</td><td>int</td><td>30000</td><td>[0,...]</td><td>medium</td><td>The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.</td></tr>
+<tr>
+<td>block.on.buffer.full</td><td>boolean</td><td>false</td><td></td><td>low</td><td>When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default this setting is true and we block, however in some scenarios blocking is not desirable and it is better to immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.</td></tr>
+<tr>
+<td>max.in.flight.requests.per.connection</td><td>int</td><td>5</td><td>[1,...]</td><td>low</td><td>The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).</td></tr>
+<tr>
+<td>metadata.fetch.timeout.ms</td><td>long</td><td>60000</td><td>[0,...]</td><td>low</td><td>The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This fetch to succeed before throwing an exception back to the client.</td></tr>
+<tr>
+<td>metadata.max.age.ms</td><td>long</td><td>300000</td><td>[0,...]</td><td>low</td><td>The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.</td></tr>
+<tr>
+<td>metric.reporters</td><td>list</td><td>[]</td><td></td><td>low</td><td>A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.</td></tr>
+<tr>
+<td>metrics.num.samples</td><td>int</td><td>2</td><td>[1,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>metrics.sample.window.ms</td><td>long</td><td>30000</td><td>[0,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>principal.builder.class</td><td>class</td><td>class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder</td><td></td><td>low</td><td>principal builder to generate a java Principal. This config is optional for client.</td></tr>
+<tr>
+<td>reconnect.backoff.ms</td><td>long</td><td>50</td><td>[0,...]</td><td>low</td><td>The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.</td></tr>
+<tr>
+<td>retry.backoff.ms</td><td>long</td><td>100</td><td>[0,...]</td><td>low</td><td>The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.</td></tr>
+<tr>
+<td>sasl.kerberos.kinit.cmd</td><td>string</td><td>/usr/bin/kinit</td><td></td><td>low</td><td>Kerberos kinit command path. Default is /usr/bin/kinit</td></tr>
+<tr>
+<td>sasl.kerberos.min.time.before.relogin</td><td>long</td><td>60000</td><td></td><td>low</td><td>Login thread sleep time between refresh attempts.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.jitter</td><td>double</td><td>0.05</td><td></td><td>low</td><td>Percentage of random jitter added to the renewal time.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.window.factor</td><td>double</td><td>0.8</td><td></td><td>low</td><td>Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.</td></tr>
+<tr>
+<td>ssl.cipher.suites</td><td>list</td><td>null</td><td></td><td>low</td><td>A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.</td></tr>
+<tr>
+<td>ssl.endpoint.identification.algorithm</td><td>string</td><td>null</td><td></td><td>low</td><td>The endpoint identification algorithm to validate server hostname using server certificate. </td></tr>
+<tr>
+<td>ssl.keymanager.algorithm</td><td>string</td><td>SunX509</td><td></td><td>low</td><td>The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+<tr>
+<td>ssl.trustmanager.algorithm</td><td>string</td><td>PKIX</td><td></td><td>low</td><td>The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+</table>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/quickstart.html
----------------------------------------------------------------------
diff --git a/090/quickstart.html b/090/quickstart.html
index 1e5c3ab..ac5623b 100644
--- a/090/quickstart.html
+++ b/090/quickstart.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <h3><a id="quickstart">1.3 Quick Start</a></h3>
 
 This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data.
@@ -170,3 +187,65 @@ my test message 1
 my test message 2
 <b>^C</b>
 </pre>
+
+
+<h4>Step 7: Use Kafka Connect to import/export data</h4>
+
+Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want
+to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom
+integration code you can use Kafka Connect to import or export data.
+
+Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs
+<i>connectors</i>, which implement the custom logic for interacting with an external system. In this quickstart we'll see
+how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a
+Kafka topic to a file.
+
+First, we’ll start by creating some seed data to test with:
+
+<pre>
+&gt; <b>echo -e "foo\nbar" > test.txt</b>
+</pre>
+
+Next, we'll start two connectors running in <i>standalone</i> mode, which means they run in a single, local, dedicated
+process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect
+process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data.
+The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector
+class to instantiate, and any other configuration required by the connector.
+
+<pre>
+&gt; <b>bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties</b>
+</pre>
+
+These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier
+and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic
+and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file.
+
+During startup you'll see a number of log messages, including some indicating that the connectors are being instantiated.
+Once the Kafka Connect process has started, the source connector should start reading lines from <pre>test.txt</pre> and
+producing them to the topic <pre>connect-test</pre>, and the sink connector should start reading messages from the topic <pre>connect-test</pre>
+and write them to the file <pre>test.sink.txt</pre>. We can verify the data has been delivered through the entire pipeline
+by examining the contents of the output file:
+
+<pre>
+&gt; <b>cat test.sink.txt</b>
+foo
+bar
+</pre>
+
+Note that the data is being stored in the Kafka topic <pre>connect-test</pre>, so we can also run a console consumer to see the
+data in the topic (or use custom consumer code to process it):
+
+<pre>
+&gt; <b>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning</b>
+{"schema":{"type":"string","optional":false},"payload":"foo"}
+{"schema":{"type":"string","optional":false},"payload":"bar"}
+...
+</pre>
+
+The connectors continue to process data, so we can add data to the file and see it move through the pipeline:
+
+<pre>
+&gt; <b>echo "Another line" >> test.txt</b>
+</pre>
+
+You should see the line appear in the console consumer output and in the sink file.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/security.html
----------------------------------------------------------------------
diff --git a/090/security.html b/090/security.html
new file mode 100644
index 0000000..80e30bc
--- /dev/null
+++ b/090/security.html
@@ -0,0 +1,301 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<h3><a id="security_overview">7.1 Security Overview</a></h3>
+In release 0.9.0.0, the Kafka community added a number of features that, used either separately or together, increases security in a Kafka cluster. The following security measures are currently supported:
+<ol>
+    <li>Authenticating clients (Producers and consumers) connections to brokers, using either SSL or SASL (Kerberos)</li>
+    <li>Authorizing read / write operations by clients</li>
+    <li>Encryption of data sent between brokers and clients, or between brokers, using SSL</li>
+    <li>Authenticate brokers connecting to ZooKeeper</li>
+    <li>Security is optional - non-secured clusters are supported, as well as a mix of authenticated, unauthenticated, encrypted and non-encrypted clients.</li>
+    <li>Authorization is pluggable and supports integration with external authorization services</li>
+</ol>
+
+The guides below explain how to configure and use the security features in both clients and brokers.
+
+<h3><a id="security_ssl">7.2 Encryption and Authentication using SSL</a></h3>
+Apache Kafka allows clients to connect over SSL. By default SSL is disabled but can be turned on as needed.
+
+<ol>
+    <li><h4>Generate SSL key and certificate for each Kafka broker</h4>
+        The first step of deploying HTTPS is to generate the key and the certificate for each machine in the cluster. You can use Java’s keytool utility to accomplish this task.
+        We will generate the key into a temporary keystore initially so that we can export and sign it later with CA.
+        <pre>$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey</pre>
+
+        You need to specify two parameters in the above command:
+        <ol>
+            <li>keystore: the keystore file that stores the certificate. The keystore file contains the private key of the certificate; therefore, it needs to be kept safely.</li>
+            <li>validity: the valid time of the certificate in days.</li>
+        </ol>
+        Ensure that common name (CN) matches exactly with the fully qualified domain name (FQDN) of the server. The client compares the CN with the DNS domain name to ensure that it is indeed connecting to the desired server, not the malicious one.</li>
+
+    <li><h4>Creating your own CA</h4>
+        After the first step, each machine in the cluster has a public-private key pair, and a certificate to identify the machine. The certificate, however, is unsigned, which means that an attacker can create such a certificate to pretend to be any machine.<p>
+        Therefore, it is important to prevent forged certificates by signing them for each machine in the cluster. A certificate authority (CA) is responsible for signing certificates. CA works likes a government that issues passports—the government stamps (signs) each passport so that the passport becomes difficult to forge. Other governments verify the stamps to ensure the passport is authentic. Similarly, the CA signs the certificates, and the cryptography guarantees that a signed certificate is computationally difficult to forge. Thus, as long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to the authentic machines.
+        <pre>openssl req <b>-new</b> -x509 -keyout ca-key -out ca-cert -days 365</pre>
+
+        The generated CA is simply a public-private key pair and certificate, and it is intended to sign other certificates.<br>
+
+        The next step is to add the generated CA to the **clients’ truststore** so that the clients can trust this CA:
+        <pre>keytool -keystore server.truststore.jks -alias CARoot <b>-import</b> -file ca-cert</pre>
+
+        <b>Note:</b> If you configure Kafka brokers to require client authentication by setting ssl.client.auth to be "requested" or "required" on <a href="#config_broker">Kafka broker config</a> then you must provide a truststore for kafka broker as well and it should have all the CA certificates that clients keys signed by.
+        <pre>keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert</pre>
+
+        In contrast to the keystore in step 1 that stores each machine’s own identity, the truststore of a client stores all the certificates that the client should trust. Importing a certificate into one’s truststore also means that trusting all certificates that are signed by that certificate. As the analogy above, trusting the government (CA) also means that trusting all passports (certificates) that it has issued. This attribute is called the chains of trust, and it is particularly useful when deploying SSL on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that trusts the CA. That way all machines can authenticate all other machines.</li>
+
+        <li><h4>Signing the certificate</h4>
+        The next step is to sign all certificates generated by step 1 with the CA generated in step 2. First, you need to export the certificate from the keystore:
+        <pre>keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file</pre>
+
+        Then sign it with the CA:
+        <pre>openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}</pre>
+
+        Finally, you need to import both the certificate of the CA and the signed certificate into the keystore:
+        <pre>
+            $ keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
+            $ keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
+        </pre>
+
+        The definitions of the parameters are the following:
+        <ol>
+            <li>keystore: the location of the keystore</li>
+            <li>ca-cert: the certificate of the CA</li>
+            <li>ca-key: the private key of the CA</li>
+            <li>ca-password: the passphrase of the CA</li>
+            <li>cert-file: the exported, unsigned certificate of the server</li>
+            <li>cert-signed: the signed certificate of the server</li>
+        </ol>
+
+        Here is an example of a bash script with all above steps. Note that one of the commands assumes a password of `test1234`, so either use that password or edit the command before running it.
+            <pre>
+        #!/bin/bash
+        #Step 1
+        keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
+        #Step 2
+        openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
+        keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
+        keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
+        #Step 3
+        keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
+        openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
+        keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
+        keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
+                </pre></li>
+    <li><h4><a name="config_broker">Configuring Kafka Broker</a></h4>
+        Kafka Broker comes with the feature of listening on multiple ports thanks to [KAFKA-1809](https://issues.apache.org/jira/browse/KAFKA-1809).
+        We need to configure the following property in server.properties, which must have one or more comma-separated values:
+        <pre>listeners</pre>
+
+        If SSL is not enabled for inter-broker communication (see below for how to enable it), both PLAINTEXT and SSL ports will be necessary.
+        <pre>listeners=PLAINTEXT://host.name:port,SSL://host.name:port</pre>
+
+        Following SSL configs are needed on the broker side
+        <pre>
+        ssl.keystore.location = /var/private/ssl/kafka.server.keystore.jks
+        ssl.keystore.password = test1234
+        ssl.key.password = test1234
+        ssl.truststore.location = /var/private/ssl/kafka.server.truststore.jks
+        ssl.truststore.password = test1234
+        </pre>
+
+        Optional settings that are worth considering:
+        <ol>
+            <li>ssl.client.auth = none ("required" => client authentication is required, "requested" => client authentication is requested and client without certs can still connect when this option chosen")</li>
+            <li>ssl.cipher.suites = A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. (Default is an empty list)</li>
+            <li>ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1 (list out the SSL protocols that you are going to accept from clients. Do note SSL is deprecated and using that in production is not recommended)</li>
+            <li> ssl.keystore.type = JKS</li>
+            <li>ssl.truststore.type = JKS</li>
+        </ol>
+        If you want to enable SSL for inter-broker communication, add the following to the broker properties file (it defaults to PLAINTEXT)
+        <pre>security.inter.broker.protocol = SSL</pre>
+
+        If you want to enable any cipher suites other than the defaults that comes with JVM like the ones listed here:
+        <a href="https://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html">https://docs.oracle.com/javase/7/docs/technotes/guides/security/SunProviders.html</a> you will need to install <b><a href="http://www.oracle.com/technetwork/java/javase/downloads/jce-7-download-432124.html">Unlimited Strength Policy files</a></b><br>
+
+        Once you start the broker you should be able to see in the server.log
+        <pre>with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)</pre>
+
+        To check quickly if  the server keystore and truststore are setup properly you can run the following command
+        <pre>openssl s_client -debug -connect localhost:9093 -tls1</pre> (Note: TLSv1 should be listed under ssl.enabled.protocols)<br>
+        In the output of this command you should see server's certificate:
+        <pre>
+        -----BEGIN CERTIFICATE-----
+        {variable sized random bytes}
+        -----END CERTIFICATE-----
+        subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
+        issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
+            </pre>
+        If the certificate does not show up or if there are any other error messages than your keystore is not setup properly.</li>
+
+        <li><h4>Configuring Kafka Clients</h4>h4>
+        SSL is supported only for new Kafka Producer & Consumer, the older API is not supported. The configs for SSL will be same for both producer & consumer.<br>
+        If client authentication is not required in the broker, then the following is a minimal configuration example:
+        <pre>
+        security.protocol = SSL
+        ssl.truststore.location = "/var/private/ssl/kafka.client.truststore.jks"
+        ssl.truststore.password = "test1234"
+            </pre>
+
+        If client authentication is required, then a keystore must be created like in step 1 and the following must also be configured:
+            <pre>
+        ssl.keystore.location = "/var/private/ssl/kafka.client.keystore.jks"
+        ssl.keystore.password = "test1234"
+        ssl.key.password = "test1234"
+                </pre>
+        Other configuration settings that may also be needed depending on our requirements and the broker configuration:\
+            <ol>
+                <li>ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</li>
+                <li>ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.</li>
+                <li>ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 **Should list at least one of the protocols configured on the broker side</li>
+                <li>ssl.truststore.type = "JKS"</li>
+                <li>ssl.keystore.type = "JKS"</li>
+            </ol>
+<br>
+        Examples using console-producer and console-consumer:
+        <pre>
+            kafka-console-producer.sh --broker-list localhost:9093 --topic test --new-producer --producer-property "security.protocol=SSL"  --producer-property "ssl.truststore.location=client.truststore.jks" --producer-property "ssl.truststore.password=test1234"
+
+            kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config client-ssl.properties
+            </pre>
+    </li>
+</ol>
+<h3><a id="security_sasl">7.3 Authentication using SASL</a></h3>
+
+<ol>
+    <li><h4>Prerequisites</h4><br>
+    <ol>
+        <li><b>Kerberos</b><br>
+        If your organization is already using a Kerberos server (for example, by using Active Directory), there is no need to install a new server just for Kafka. Otherwise you will need to install one, your Linux vendor likely has packages for Kerberos and a short guide on how to install and configure it (<a href="https://help.ubuntu.com/community/Kerberos">Ubuntu</a>, <a href="https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Managing_Smart_Cards/installing-kerberos.html">Redhat</a>). Note that if you are using Oracle Java, you will need to download JCE policy files for your Java version and copy them to $JAVA_HOME/jre/lib/security.</li>
+        <li><b>Create Kerberos Principals</b><br>
+        If you are using the organization's Kerberos or Active Directory server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every Linux user that will access Kafka with Kerberos authentication.</br>
+        If you installed your own Kerberos, you will need to create these principals yourself:</br>
+            <code>sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/hostname@domainname'<br>
+                sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/kafka.keytab kafka/hostname@domainname"</code></li>
+        <li><b>Make sure all hosts can be reachable using hostnames</b> - It is important in case of kerberos all your hosts can be resolved with their FQDNs.</li>
+        <li><b><a name="jaas_config_file">Creating JAAS Config File</a></b><br>
+            Each node in the cluster should have a JAAS file similar to the example below. Add this file to kafka/config dir:
+        <pre>
+            KafkaServer {
+                com.sun.security.auth.module.Krb5LoginModule required
+                useKeyTab=true
+                storeKey=true
+                serviceName="kafka"
+                keyTab="/etc/security/keytabs/kafka1.keytab"
+                principal="kafka/kafka1.hostname.com@DOMAIN.COM";
+            };
+
+            Client {
+               com.sun.security.auth.module.Krb5LoginModule required
+               useKeyTab=true
+               storeKey=true
+               serviceName="zookeeper"
+               keyTab="/etc/security/keytabs/kafka1.keytab"
+               principal="kafka/kafka1.hostname.com@DOMAIN.COM";
+            };
+
+            KafkaClient {
+               com.sun.security.auth.module.Krb5LoginModule required
+               useTicketCache=true
+               serviceName="kafka";
+            };
+        </pre>
+            <u>Important notes:</u>
+            <ol>
+                <li>KafkaServer is a section name in JAAS file used by KafkaServer/Broker. This section tells Kafka Server which principal to use and which keytab this principal is stored. It allows Kafka Server to login using the keytab specified in this section.</li>
+                <li>Client section is used to authenticate a SASL connection with zookeeper. It also allows a broker to set SASL ACL on zookeeper nodes which locks these nodes down so that only kafka broker can modify. It is necessary to have the same principal name across all the brokers. If you want to use a section name other than Client, then you need to set the system property <tt>zookeeper.sasl.client</tt> to the appropriate name (<i>e.g.</i>, <tt>-Dzookeeper.sasl.client=ZkClient</tt>).</li>
+                <li>KafkaClient section here describes how the clients like producer and consumer can connect to the Kafka Broker. Here we specified "useTicketCache=true" not a keytab this allows user to do kinit and run a kafka-console-consumer or kafka-console-producer to connect to broker. For a long running process one should create KafkaClient section similar to KafkaServer.</li>
+                <li>In KafkaServer and KafkaClient sections we've "serviceName" this should match principal name with which kafka broker is running. In the above example principal="kafka/kafka1.hostname.com@DOMAIN.com" so we've "kafka" which is matching the principalName.</li>
+            </ol>
+        </li>
+        <li><b><a name="jaas_client">Creating Client Side JAAS Config</a></b><br>
+        Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user used for running the client), so obtain or create these principals as needed. Then create a JAAS file as follows:
+            <pre>
+                KafkaClient {
+                    com.sun.security.auth.module.Krb5LoginModule required
+                    useKeyTab=true
+                    storeKey=true
+                    serviceName="kafka"
+                    keyTab="/etc/security/keytabs/kafka1.keytab"
+                    principal="kafkaproducer/hostname@DOMAIN.COM";
+                };
+            </pre>
+        </li>
+    </ol></li>
+    <li><h4>Configuring Kafka Brokers</h4>
+    <ol>
+        <li>Pass the name of the jaas file you created in <a href="#jaas_config_file">Creating JAAS Config File"</a> as a JVM parameter to the kafka broker: <pre>-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf</pre></li>
+        <li>Make sure the keytabs configured in the kafka_jaas.conf are readable by the linux user who is starting kafka broker.</li>
+        <li>Configure a SASL port in server.properties, by adding the following to the <i>listeners</i> parameter, which contains one or more comma-separated values:
+            <pre>listeners=SASL_PLAINTEXT://host.name:port</pre>
+        If you are only configuring SASL port (or if you are very paranoid and want the Kafka brokers to authenticate each other using SASL) then make sure you set same SASL protocol for inter-broker communication:
+        <pre>security.inter.broker.protocol=SASL_PLAINTEXT</pre></li>
+
+    </ol>
+    </li>
+    <li><h4>Configuring Kafka Clients</h4>
+        SASL authentication is only supported for new kafka producer and consumer, the older API is not supported.>br>
+        To configure SASL authentication on the clients:
+        <ol>
+            <li>pass the name of the jaas file you created in <a href="#jaas_client">Creating Client Side JAAS Config"</a> as a JVM parameter to the client JVM:
+        <pre>-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf</pre></li>
+            <li>Make sure the keytabs configured in the kafka_client_jaas.conf are readable by the linux user who is starting kafka client.</li>
+            <li>Configure the following property in producer.properties or consumer.properties:
+                <pre>security.protocol=SASL_PLAINTEXT</pre></li>
+        </ol></li>
+</ol>
+
+<h3><a id="security_authz">7.4 Authorization and ACLs</a></h3>
+<h3><a id="zk_authz">7.5 ZooKeeper Authentication</a></h3>
+<h4><a id="zk_authz_new">7.5.1 New clusters</a></h4>
+To enable ZooKeeper authentication on brokers, there are two necessary steps:
+<ol>
+	<li> Create a JAAS login file and set the appropriate system property to point to it as described above</li>
+	<li> Set the configuration property <tt>zookeeper.set.acl</tt> in each broker to true</li>
+</ol>
+
+The metadata stored in ZooKeeper is such that only brokers will be able to modify the corresponding znodes, but znodes are world readable. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of znodes can cause cluster disruption.
+
+<h4><a id="zk_authz_migration">7.5.2 Migrating clusters</a></h4>
+If you are running a version of Kafka that does not support security of simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
+<ol>
+	<li>Perform a rolling restart setting the JAAS login file, which enables brokers to authenticate. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs</li>
+	<li>Perform a second rolling restart of brokers, this time setting the configuration parameter <tt>zookeeper.set.acl</tt> to true, which enables the use of secure ACLs when creating znodes</li>
+	<li>Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes</li>
+</ol>
+<p>It is also possible to turn off authentication in a secure cluster. To do it, follow these steps:</p>
+<ol>
+	<li>Perform a rolling restart of brokers setting the JAAS login file, which enables brokers to authenticate, but setting <tt>zookeeper.set.acl</tt> to false. At the end of the rolling restart, brokers stop creating znodes with secure ACLs, but are still able to authenticate and manipulate all znodes</li>
+	<li>Execute the ZkSecurityMigrator tool. To execute the tool, run this script <tt>./bin/zookeeper-security-migration.sh</tt> with <tt>zookeeper.acl</tt> set to unsecure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes</li>
+	<li>Perform a second rolling restart of brokers, this time omitting the system property that sets the JAAS login file</li>
+</ol>
+Here is an example of how to run the migration tool:
+<pre>
+./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181
+</pre>
+<p>Run this to see the full list of parameters:</p>
+<pre>
+./bin/zookeeper-security-migration --help
+</pre>
+<h4><a id="zk_authz_new">7.5.3 Migrating the ZooKeeper ensemble</a></h4>
+It is also necessary to enable authentication on the ZooKeeper ensemble. To do it, we need to perform a rolling restart of the server and set a few properties. Please refer to the ZooKeeper documentation for more detail:
+<ol>
+	<li><a href="http://zookeeper.apache.org/doc/r3.4.6/zookeeperProgrammers.html#sc_ZooKeeperAccessControl">Apache ZooKeeper documentation</a></li>
+	<li><a href="https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL">Apache ZooKeeper wiki</a></li>
+	<li><a href="http://www.cloudera.com/content/www/en-us/documentation/cdh/5-1-x/CDH5-Security-Guide/cdh5sg_zookeeper_security.html">Cloudera ZooKeeper security configuration</a></li>
+</ol>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/upgrade.html
----------------------------------------------------------------------
diff --git a/090/upgrade.html b/090/upgrade.html
index 2be08dd..69ff20a 100644
--- a/090/upgrade.html
+++ b/090/upgrade.html
@@ -1,12 +1,29 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <h3><a id="upgrade">1.5 Upgrading From Previous Versions</a></h3>
 
-<h4>Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.8.3.0</h4>
+<h4>Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0</h4>
 
-0.8.3.0 has an inter-broker protocol change from previous versions. For a rolling upgrade:
+0.9.0.0 has an inter-broker protocol change from previous versions. For a rolling upgrade:
 <ol>
 	<li> Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X </li>
 	<li> Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it. </li>
-	<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.8.3.0.</li>
+	<li> Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.</li>
 	<li> Restart the brokers one by one for the new protocol version to take effect </li>
 </ol>
 
@@ -14,6 +31,26 @@ Note: If you are willing to accept downtime, you can simply take all the brokers
 
 Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.
 
+<h5>Potential breaking changes in 0.9.0.0</h5>
+
+<ul>
+    <li> Java 1.6 is no longer supported. </li>
+    <li> Tools packaged under <em>org.apache.kafka.clients.tools.*</em> have been moved to <em>org.apache.kafka.tools.*</em>. All included scripts will still function as usual, only custom code directly importing these classes will be affected. </li>
+    <li> The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh. </li>
+    <li> The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure. </li>
+    <li> The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a '.' or '_' in the topic name, and error in the case of an actual collision. </li>
+    <li> The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the new producer instead of the old producer be default, and users have to specify 'old-producer' to use the old producer. </li>
+    <li> By default all command line tools will print all logging messages to stderr instead of stout. </li>
+</ul>
+
+<h5>Deprecations in 0.9.0.0</h5>
+
+<ul>
+    <li> Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality. </li>
+    <li> The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality. </li>
+    <li> The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class). </li>
+</ul>
+
 <h4>Upgrading from 0.8.1 to 0.8.2.0</h4>
 
 0.8.2.0 is fully compatible with 0.8.1. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/uses.html
----------------------------------------------------------------------
diff --git a/090/uses.html b/090/uses.html
index 6b73ddd..aa87d07 100644
--- a/090/uses.html
+++ b/090/uses.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <h3><a id="uses">1.2 Use Cases</a></h3>
 
 Here is a description of a few of the popular use cases for Apache Kafka. For an overview of a number of these areas in action, see <a href="http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying">this blog post</a>.


[2/3] kafka-site git commit: add 0.9.0 docs

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/connect_config.html
----------------------------------------------------------------------
diff --git a/090/connect_config.html b/090/connect_config.html
new file mode 100644
index 0000000..c683d54
--- /dev/null
+++ b/090/connect_config.html
@@ -0,0 +1,112 @@
+<table>
+<tr>
+<th>Name</th>
+<th>Type</th>
+<th>Default</th>
+<th>Valid Values</th>
+<th>Importance</th>
+<th>Description</th>
+</tr>
+<tr>
+<td>group.id</td><td>string</td><td></td><td></td><td>high</td><td>A unique string that identifies the Connect cluster group this worker belongs to.</td></tr>
+<tr>
+<td>internal.key.converter</td><td>class</td><td></td><td></td><td>high</td><td>Converter class for internal key Connect data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.</td></tr>
+<tr>
+<td>internal.value.converter</td><td>class</td><td></td><td></td><td>high</td><td>Converter class for offset value Connect data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.</td></tr>
+<tr>
+<td>key.converter</td><td>class</td><td></td><td></td><td>high</td><td>Converter class for key Connect data that implements the <code>Converter</code> interface.</td></tr>
+<tr>
+<td>value.converter</td><td>class</td><td></td><td></td><td>high</td><td>Converter class for value Connect data that implements the <code>Converter</code> interface.</td></tr>
+<tr>
+<td>bootstrap.servers</td><td>list</td><td>[localhost:9092]</td><td></td><td>high</td><td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).</td></tr>
+<tr>
+<td>cluster</td><td>string</td><td>connect</td><td></td><td>high</td><td>ID for this cluster, which is used to provide a namespace so multiple Kafka Connect clusters or instances may co-exist while sharing a single Kafka cluster.</td></tr>
+<tr>
+<td>heartbeat.interval.ms</td><td>int</td><td>3000</td><td></td><td>high</td><td>The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.</td></tr>
+<tr>
+<td>session.timeout.ms</td><td>int</td><td>30000</td><td></td><td>high</td><td>The timeout used to detect failures when using Kafka's group management facilities.</td></tr>
+<tr>
+<td>ssl.key.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The password of the private key in the key store file. This is optional for client.</td></tr>
+<tr>
+<td>ssl.keystore.location</td><td>string</td><td>null</td><td></td><td>high</td><td>The location of the key store file. This is optional for client and can be used for two-way authentication for client.</td></tr>
+<tr>
+<td>ssl.keystore.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. </td></tr>
+<tr>
+<td>ssl.truststore.location</td><td>string</td><td>null</td><td></td><td>high</td><td>The location of the trust store file. </td></tr>
+<tr>
+<td>ssl.truststore.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The password for the trust store file. </td></tr>
+<tr>
+<td>connections.max.idle.ms</td><td>long</td><td>540000</td><td></td><td>medium</td><td>Close idle connections after the number of milliseconds specified by this config.</td></tr>
+<tr>
+<td>receive.buffer.bytes</td><td>int</td><td>32768</td><td>[0,...]</td><td>medium</td><td>The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.</td></tr>
+<tr>
+<td>request.timeout.ms</td><td>int</td><td>40000</td><td>[0,...]</td><td>medium</td><td>The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.</td></tr>
+<tr>
+<td>sasl.kerberos.principal.to.local.rules</td><td>list</td><td>[DEFAULT]</td><td></td><td>medium</td><td>A list of rules for mapping from principal names to short names (typically operating system usernames). The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.</td></tr>
+<tr>
+<td>sasl.kerberos.service.name</td><td>string</td><td>null</td><td></td><td>medium</td><td>The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.</td></tr>
+<tr>
+<td>security.protocol</td><td>string</td><td>PLAINTEXT</td><td></td><td>medium</td><td>Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.</td></tr>
+<tr>
+<td>send.buffer.bytes</td><td>int</td><td>131072</td><td>[0,...]</td><td>medium</td><td>The size of the TCP send buffer (SO_SNDBUF) to use when sending data.</td></tr>
+<tr>
+<td>ssl.enabled.protocols</td><td>list</td><td>[TLSv1.2, TLSv1.1, TLSv1]</td><td></td><td>medium</td><td>The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.</td></tr>
+<tr>
+<td>ssl.keystore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the key store file. This is optional for client. Default value is JKS</td></tr>
+<tr>
+<td>ssl.protocol</td><td>string</td><td>TLS</td><td></td><td>medium</td><td>The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.</td></tr>
+<tr>
+<td>ssl.provider</td><td>string</td><td>null</td><td></td><td>medium</td><td>The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</td></tr>
+<tr>
+<td>ssl.truststore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the trust store file. Default value is JKS.</td></tr>
+<tr>
+<td>worker.sync.timeout.ms</td><td>int</td><td>3000</td><td></td><td>medium</td><td>When the worker is out of sync with other workers and needs to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and waiting a backoff period before rejoining.</td></tr>
+<tr>
+<td>worker.unsync.backoff.ms</td><td>int</td><td>300000</td><td></td><td>medium</td><td>When the worker is out of sync with other workers and  fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining.</td></tr>
+<tr>
+<td>client.id</td><td>string</td><td>""</td><td></td><td>low</td><td>An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.</td></tr>
+<tr>
+<td>metadata.max.age.ms</td><td>long</td><td>300000</td><td>[0,...]</td><td>low</td><td>The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.</td></tr>
+<tr>
+<td>metric.reporters</td><td>list</td><td>[]</td><td></td><td>low</td><td>A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.</td></tr>
+<tr>
+<td>metrics.num.samples</td><td>int</td><td>2</td><td>[1,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>metrics.sample.window.ms</td><td>long</td><td>30000</td><td>[0,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>offset.flush.interval.ms</td><td>long</td><td>60000</td><td></td><td>low</td><td>Interval at which to try committing offsets for tasks.</td></tr>
+<tr>
+<td>offset.flush.timeout.ms</td><td>long</td><td>5000</td><td></td><td>low</td><td>Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.</td></tr>
+<tr>
+<td>principal.builder.class</td><td>class</td><td>class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder</td><td></td><td>low</td><td>principal builder to generate a java Principal. This config is optional for client.</td></tr>
+<tr>
+<td>reconnect.backoff.ms</td><td>long</td><td>50</td><td>[0,...]</td><td>low</td><td>The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.</td></tr>
+<tr>
+<td>rest.advertised.host.name</td><td>string</td><td>null</td><td></td><td>low</td><td>If this is set, this is the hostname that will be given out to other workers to connect to.</td></tr>
+<tr>
+<td>rest.advertised.port</td><td>int</td><td>null</td><td></td><td>low</td><td>If this is set, this is the port that will be given out to other workers to connect to.</td></tr>
+<tr>
+<td>rest.host.name</td><td>string</td><td>null</td><td></td><td>low</td><td>Hostname for the REST API. If this is set, it will only bind to this interface.</td></tr>
+<tr>
+<td>rest.port</td><td>int</td><td>8083</td><td></td><td>low</td><td>Port for the REST API to listen on.</td></tr>
+<tr>
+<td>retry.backoff.ms</td><td>long</td><td>100</td><td>[0,...]</td><td>low</td><td>The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.</td></tr>
+<tr>
+<td>sasl.kerberos.kinit.cmd</td><td>string</td><td>/usr/bin/kinit</td><td></td><td>low</td><td>Kerberos kinit command path. Default is /usr/bin/kinit</td></tr>
+<tr>
+<td>sasl.kerberos.min.time.before.relogin</td><td>long</td><td>60000</td><td></td><td>low</td><td>Login thread sleep time between refresh attempts.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.jitter</td><td>double</td><td>0.05</td><td></td><td>low</td><td>Percentage of random jitter added to the renewal time.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.window.factor</td><td>double</td><td>0.8</td><td></td><td>low</td><td>Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.</td></tr>
+<tr>
+<td>ssl.cipher.suites</td><td>list</td><td>null</td><td></td><td>low</td><td>A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.</td></tr>
+<tr>
+<td>ssl.endpoint.identification.algorithm</td><td>string</td><td>null</td><td></td><td>low</td><td>The endpoint identification algorithm to validate server hostname using server certificate. </td></tr>
+<tr>
+<td>ssl.keymanager.algorithm</td><td>string</td><td>SunX509</td><td></td><td>low</td><td>The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+<tr>
+<td>ssl.trustmanager.algorithm</td><td>string</td><td>PKIX</td><td></td><td>low</td><td>The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+<tr>
+<td>task.shutdown.graceful.timeout.ms</td><td>long</td><td>5000</td><td></td><td>low</td><td>Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.</td></tr>
+</table>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/consumer_config.html
----------------------------------------------------------------------
diff --git a/090/consumer_config.html b/090/consumer_config.html
new file mode 100644
index 0000000..7ad5a32
--- /dev/null
+++ b/090/consumer_config.html
@@ -0,0 +1,102 @@
+<table>
+<tr>
+<th>Name</th>
+<th>Type</th>
+<th>Default</th>
+<th>Valid Values</th>
+<th>Importance</th>
+<th>Description</th>
+</tr>
+<tr>
+<td>bootstrap.servers</td><td>list</td><td></td><td></td><td>high</td><td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).</td></tr>
+<tr>
+<td>key.deserializer</td><td>class</td><td></td><td></td><td>high</td><td>Deserializer class for key that implements the <code>Deserializer</code> interface.</td></tr>
+<tr>
+<td>value.deserializer</td><td>class</td><td></td><td></td><td>high</td><td>Deserializer class for value that implements the <code>Deserializer</code> interface.</td></tr>
+<tr>
+<td>fetch.min.bytes</td><td>int</td><td>1024</td><td>[0,...]</td><td>high</td><td>The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.</td></tr>
+<tr>
+<td>group.id</td><td>string</td><td>""</td><td></td><td>high</td><td>A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.</td></tr>
+<tr>
+<td>heartbeat.interval.ms</td><td>int</td><td>3000</td><td></td><td>high</td><td>The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.</td></tr>
+<tr>
+<td>max.partition.fetch.bytes</td><td>int</td><td>1048576</td><td>[0,...]</td><td>high</td><td>The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.</td></tr>
+<tr>
+<td>session.timeout.ms</td><td>int</td><td>30000</td><td></td><td>high</td><td>The timeout used to detect failures when using Kafka's group management facilities.</td></tr>
+<tr>
+<td>ssl.key.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The password of the private key in the key store file. This is optional for client.</td></tr>
+<tr>
+<td>ssl.keystore.location</td><td>string</td><td>null</td><td></td><td>high</td><td>The location of the key store file. This is optional for client and can be used for two-way authentication for client.</td></tr>
+<tr>
+<td>ssl.keystore.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. </td></tr>
+<tr>
+<td>ssl.truststore.location</td><td>string</td><td>null</td><td></td><td>high</td><td>The location of the trust store file. </td></tr>
+<tr>
+<td>ssl.truststore.password</td><td>string</td><td>null</td><td></td><td>high</td><td>The password for the trust store file. </td></tr>
+<tr>
+<td>auto.offset.reset</td><td>string</td><td>latest</td><td>[latest, earliest, none]</td><td>medium</td><td>What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>earliest: automatically reset the offset to the earliest offset<li>latest: automatically reset the offset to the latest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul></td></tr>
+<tr>
+<td>connections.max.idle.ms</td><td>long</td><td>540000</td><td></td><td>medium</td><td>Close idle connections after the number of milliseconds specified by this config.</td></tr>
+<tr>
+<td>enable.auto.commit</td><td>boolean</td><td>true</td><td></td><td>medium</td><td>If true the consumer's offset will be periodically committed in the background.</td></tr>
+<tr>
+<td>partition.assignment.strategy</td><td>list</td><td>[org.apache.kafka.clients.consumer.RangeAssignor]</td><td></td><td>medium</td><td>The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used</td></tr>
+<tr>
+<td>receive.buffer.bytes</td><td>int</td><td>32768</td><td>[0,...]</td><td>medium</td><td>The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.</td></tr>
+<tr>
+<td>request.timeout.ms</td><td>int</td><td>40000</td><td>[0,...]</td><td>medium</td><td>The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.</td></tr>
+<tr>
+<td>sasl.kerberos.service.name</td><td>string</td><td>null</td><td></td><td>medium</td><td>The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.</td></tr>
+<tr>
+<td>security.protocol</td><td>string</td><td>PLAINTEXT</td><td></td><td>medium</td><td>Protocol used to communicate with brokers. Currently only PLAINTEXT and SSL are supported.</td></tr>
+<tr>
+<td>send.buffer.bytes</td><td>int</td><td>131072</td><td>[0,...]</td><td>medium</td><td>The size of the TCP send buffer (SO_SNDBUF) to use when sending data.</td></tr>
+<tr>
+<td>ssl.enabled.protocols</td><td>list</td><td>[TLSv1.2, TLSv1.1, TLSv1]</td><td></td><td>medium</td><td>The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.</td></tr>
+<tr>
+<td>ssl.keystore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the key store file. This is optional for client. Default value is JKS</td></tr>
+<tr>
+<td>ssl.protocol</td><td>string</td><td>TLS</td><td></td><td>medium</td><td>The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.</td></tr>
+<tr>
+<td>ssl.provider</td><td>string</td><td>null</td><td></td><td>medium</td><td>The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</td></tr>
+<tr>
+<td>ssl.truststore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the trust store file. Default value is JKS.</td></tr>
+<tr>
+<td>auto.commit.interval.ms</td><td>long</td><td>5000</td><td>[0,...]</td><td>low</td><td>The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.</td></tr>
+<tr>
+<td>check.crcs</td><td>boolean</td><td>true</td><td></td><td>low</td><td>Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.</td></tr>
+<tr>
+<td>client.id</td><td>string</td><td>""</td><td></td><td>low</td><td>An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.</td></tr>
+<tr>
+<td>fetch.max.wait.ms</td><td>int</td><td>500</td><td>[0,...]</td><td>low</td><td>The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.</td></tr>
+<tr>
+<td>metadata.max.age.ms</td><td>long</td><td>300000</td><td>[0,...]</td><td>low</td><td>The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.</td></tr>
+<tr>
+<td>metric.reporters</td><td>list</td><td>[]</td><td></td><td>low</td><td>A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.</td></tr>
+<tr>
+<td>metrics.num.samples</td><td>int</td><td>2</td><td>[1,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>metrics.sample.window.ms</td><td>long</td><td>30000</td><td>[0,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>principal.builder.class</td><td>class</td><td>class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder</td><td></td><td>low</td><td>principal builder to generate a java Principal. This config is optional for client.</td></tr>
+<tr>
+<td>reconnect.backoff.ms</td><td>long</td><td>50</td><td>[0,...]</td><td>low</td><td>The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.</td></tr>
+<tr>
+<td>retry.backoff.ms</td><td>long</td><td>100</td><td>[0,...]</td><td>low</td><td>The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.</td></tr>
+<tr>
+<td>sasl.kerberos.kinit.cmd</td><td>string</td><td>/usr/bin/kinit</td><td></td><td>low</td><td>Kerberos kinit command path. Default is /usr/bin/kinit</td></tr>
+<tr>
+<td>sasl.kerberos.min.time.before.relogin</td><td>long</td><td>60000</td><td></td><td>low</td><td>Login thread sleep time between refresh attempts.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.jitter</td><td>double</td><td>0.05</td><td></td><td>low</td><td>Percentage of random jitter added to the renewal time.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.window.factor</td><td>double</td><td>0.8</td><td></td><td>low</td><td>Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.</td></tr>
+<tr>
+<td>ssl.cipher.suites</td><td>list</td><td>null</td><td></td><td>low</td><td>A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.</td></tr>
+<tr>
+<td>ssl.endpoint.identification.algorithm</td><td>string</td><td>null</td><td></td><td>low</td><td>The endpoint identification algorithm to validate server hostname using server certificate. </td></tr>
+<tr>
+<td>ssl.keymanager.algorithm</td><td>string</td><td>SunX509</td><td></td><td>low</td><td>The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+<tr>
+<td>ssl.trustmanager.algorithm</td><td>string</td><td>PKIX</td><td></td><td>low</td><td>The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+</table>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/design.html
----------------------------------------------------------------------
diff --git a/090/design.html b/090/design.html
index fbb1d5f..347f602 100644
--- a/090/design.html
+++ b/090/design.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <h3><a id="majordesignelements">4.1 Motivation</a></h3>
 <p>
 We designed Kafka to be able to act as a unified platform for handling all the real-time data feeds <a href="#introduction">a large company might have</a>. To do this we had to think through a fairly broad set of use cases.
@@ -10,7 +27,7 @@ It also meant the system would have to handle low-latency delivery to handle mor
 <p>
 We wanted to support partitioned, distributed, real-time processing of these feeds to create new, derived feeds. This motivated our partitioning and consumer model.
 <p>
-Finally in cases where the stream is fed into other data systems for serving we new the system would have to be able to guarantee fault-tolerance in the presence of machine failures.
+Finally in cases where the stream is fed into other data systems for serving we knew the system would have to be able to guarantee fault-tolerance in the presence of machine failures.
 <p>
 Supporting these uses led use to a design with a number of unique elements, more akin to a database log then a traditional messaging system. We will outline some elements of the design in the following sections.
 
@@ -289,7 +306,7 @@ This functionality is inspired by one of LinkedIn's oldest and most successful p
 
 Here is a high-level picture that shows the logical structure of a Kafka log with the offset for each message.
 <p>
-<img src="/images/log_cleaner_anatomy.png">
+<img src="images/log_cleaner_anatomy.png">
 <p>
 The head of the log is identical to a traditional Kafka log. It has dense, sequential offsets and retains all messages. Log compaction adds an option for handling the tail of the log. The picture above shows a log with a compacted tail. Note that the messages in the tail of the log retain the original offset assigned when they were first written&mdash;that never changes. Note also that all offsets remain valid positions in the log, even if the message with that offset has been compacted away; in this case this position is indistinguishable from the next highest offset that does appear in the log. For example, in the picture above the offsets 36, 37, and 38 are all equivalent positions and a read beginning at any of these offsets would return a message set beginning with 38.
 <p>
@@ -297,7 +314,7 @@ Compaction also allows for deletes. A message with a key and a null payload will
 <p>
 The compaction is done in the background by periodically recopying log segments. Cleaning does not block reads and can be throttled to use no more than a configurable amount of I/O throughput to avoid impacting producers and consumers. The actual process of compacting a log segment looks something like this:
 <p>
-<img src="/images/log_compaction.png">
+<img src="images/log_compaction.png">
 <p>
 <h4>What guarantees does log compaction provide?</h4>
 
@@ -336,3 +353,28 @@ Further cleaner configurations are described <a href="/documentation.html#broker
   <li>You cannot configure yet how much log is retained without compaction (the "head" of the log).  Currently all segments are eligible except for the last segment, i.e. the one currently being written to.</li>
   <li>Log compaction is not yet compatible with compressed topics.</li>
 </ol>
+<h3><a id="semantics">4.9 Quotas</a></h3>
+<p>
+    Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id="test-client" has a produce quota of 10MB/sec, this is shared across all instances with that same id.
+
+<h4>Why are quotas necessary?</h4>
+<p>
+It is possible for producers and consumers to produce/consume very high volumes of data and thus monopolize broker resources, cause network saturation and generally DOS other clients and the brokers themselves. Having quotas protects against these issues and is all tbe more important in large multi-tenant clusters where a small set of badly behaved clients can degrade user experience for the well behaved ones. In fact, when running Kafka as a service this even makes it possible to enforce API limits according to an agreed upon contract.
+</p>
+<h4>Enforcement</h4>
+<p>
+    By default, each unique client-id receives a fixed quota in bytes/sec as configured by the cluster (quota.producer.default, quota.consumer.default).
+    This quota is defined on a per-broker basis. Each client can publish/fetch a maximum of X bytes/sec per broker before it gets throttled. We decided that defining these quotas per broker is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share client quota usage among all the brokers. This can be harder to get right than the quota implementation itself!
+</p>
+<p>
+    How does a broker react when it detects a quota violation? In our solution, the broker does not return an error rather it attempts to slow down a client exceeding its quota. It computes the amount of delay needed to bring a guilty client under it's quota and delays the response for that time. This approach keeps the quota violation transparent to clients (outside of client side metrics). This also keeps them from having to implement any special backoff and retry behavior which can get tricky. In fact, bad client behavior (retry without backoff) can exacerbate the very problem quotas are trying to solve.
+</p>
+<p>
+Client byte rate is measured over multiple small windows (for e.g. 30 windows of 1 second each) in order to detect and correct quota violations quickly. Typically, having large measurement windows (for e.g. 10 windows of 30 seconds each) leads to large bursts of traffic followed by long delays which is not great in terms of user experience.
+</p>
+<h4>Quota overrides</h4>
+<p>
+    It is possible to override the default quota for client-ids that need a higher (or even lower) quota. The mechanism is similar to the per-topic log config overrides.
+    Client-id overrides are written to ZooKeeper under <i><b>/config/clients</b></i>. These overrides are read by all brokers and are effective immediately. This lets us change quotas without having to do a rolling restart of the entire cluster. See <a href="/ops.html#quotas">here</a> for details.
+
+</p>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/documentation.html
----------------------------------------------------------------------
diff --git a/090/documentation.html b/090/documentation.html
index 52199d7..c64e67f 100644
--- a/090/documentation.html
+++ b/090/documentation.html
@@ -1,9 +1,26 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <!--#include virtual="../includes/header.html" -->
 
-<h1>Kafka 0.8.2 Documentation</h1>
-Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documentation.html">0.8.0</a>, <a href="/081/documentation.html">0.8.1.X</a>.
+<h1>Kafka 0.9.0 Documentation</h1>
+Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documentation.html">0.8.0</a>, <a href="/081/documentation.html">0.8.1.X</a>, <a href="/082/documentation.html">0.8.2.X</a>.
 </ul>
-    
+
 <ul class="toc">
     <li><a href="#gettingStarted">1. Getting Started</a>
          <ul>
@@ -13,20 +30,24 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documen
              <li><a href="#ecosystem">1.4 Ecosystem</a>
              <li><a href="#upgrade">1.5 Upgrading</a>
          </ul>
+    </li>
     <li><a href="#api">2. API</a>
           <ul>
               <li><a href="#producerapi">2.1 Producer API</a>
               <li><a href="#highlevelconsumerapi">2.2 High Level Consumer API</a>
               <li><a href="#simpleconsumerapi">2.3 Simple Consumer API</a>
-              <li><a href="#kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a>
+              <li><a href="#newconsumerapi">2.4 New Consumer API</a>
           </ul>
+    </li>
     <li><a href="#configuration">3. Configuration</a>
         <ul>
-             <li><a href="#brokerconfigs">3.1 Broker Configs</a>
-             <li><a href="#consumerconfigs">3.2 Consumer Configs</a>
-             <li><a href="#producerconfigs">3.3 Producer Configs</a>
-			 <li><a href="#newproducerconfigs">3.4 New Producer Configs</a>
+            <li><a href="#brokerconfigs">3.1 Broker Configs</a>
+            <li><a href="#producerconfigs">3.2 Producer Configs</a>
+            <li><a href="#consumerconfigs">3.3 Consumer Configs</a>
+            <li><a href="#newconsumerconfigs">3.4 New Consumer Configs</a>
+            <li><a href="#connectconfigs">3.5 Kafka Connect Configs</a>
         </ul>
+    </li>
     <li><a href="#design">4. Design</a>
         <ul>
              <li><a href="#majordesignelements">4.1 Motivation</a>
@@ -38,6 +59,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documen
              <li><a href="#replication">4.7 Replication</a>
              <li><a href="#compaction">4.8 Log Compaction</a>
         </ul>
+    </li>
     <li><a href="#implementation">5. Implementation</a>
         <ul>
               <li><a href="#apidesign">5.1 API Design</a>
@@ -47,6 +69,7 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documen
               <li><a href="#log">5.5 Log</a>
               <li><a href="#distributionimpl">5.6 Distribution</a>
         </ul>
+    </li>
     <li><a href="#operations">6. Operations</a>
         <ul>
              <li><a href="#basic_ops">6.1 Basic Kafka Operations</a>
@@ -84,6 +107,22 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documen
                     <li><a href="#zkops">Operationalization</a>
                 </ul>
         </ul>
+    </li>
+    <li><a href="#security">7. Security</a>
+        <ul>
+            <li><a href="#security_overview">7.1 Security Overview</a></li>
+            <li><a href="#security_ssl">7.2 Encryption and Authentication using SSL</a></li>
+            <li><a href="#security_sasl">7.3 Authentication using SASL</a></li>
+            <li><a href="#security_authz">7.4 Authorization and ACLs</a></li>
+        </ul>
+    </li>
+    <li><a href="#connect">8. Kafka Connect</a>
+        <ul>
+            <li><a href="#connect_overview">8.1 Overview</a></li>
+            <li><a href="#connect_user">8.2 User Guide</a></li>
+            <li><a href="#connect_development">8.3 Connector Development Guide</a></li>
+        </ul>
+    </li>
 </ul>
 
 <h2><a id="gettingStarted">1. Getting Started</a></h2>
@@ -113,4 +152,10 @@ Prior releases: <a href="/07/documentation.html">0.7.x</a>, <a href="/08/documen
 
 <!--#include virtual="ops.html" -->
 
+<h2><a id="security">7. Security</a></h2>
+<!--#include virtual="security.html" -->
+
+<h2><a id="connect">8. Kafka Connect</a></h2>
+<!--#include virtual="connect.html" -->
+
 <!--#include virtual="../includes/footer.html" -->

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/ecosystem.html
----------------------------------------------------------------------
diff --git a/090/ecosystem.html b/090/ecosystem.html
index eb41338..e99a446 100644
--- a/090/ecosystem.html
+++ b/090/ecosystem.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <h3><a id="ecosystem">1.4 Ecosystem</a></h3>
 
 There are a plethora of tools that integrate with Kafka outside the main distribution. The <a href="https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem"> ecosystem page</a> lists many of these, including stream processing systems, Hadoop integration, monitoring, and deployment tools.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/consumer-groups.png
----------------------------------------------------------------------
diff --git a/090/images/consumer-groups.png b/090/images/consumer-groups.png
new file mode 100644
index 0000000..16fe293
Binary files /dev/null and b/090/images/consumer-groups.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/kafka_log.png
----------------------------------------------------------------------
diff --git a/090/images/kafka_log.png b/090/images/kafka_log.png
new file mode 100644
index 0000000..75abd96
Binary files /dev/null and b/090/images/kafka_log.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/kafka_multidc.png
----------------------------------------------------------------------
diff --git a/090/images/kafka_multidc.png b/090/images/kafka_multidc.png
new file mode 100644
index 0000000..7bc56f4
Binary files /dev/null and b/090/images/kafka_multidc.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/kafka_multidc_complex.png
----------------------------------------------------------------------
diff --git a/090/images/kafka_multidc_complex.png b/090/images/kafka_multidc_complex.png
new file mode 100644
index 0000000..ab88deb
Binary files /dev/null and b/090/images/kafka_multidc_complex.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/log_anatomy.png
----------------------------------------------------------------------
diff --git a/090/images/log_anatomy.png b/090/images/log_anatomy.png
new file mode 100644
index 0000000..a649499
Binary files /dev/null and b/090/images/log_anatomy.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/log_cleaner_anatomy.png
----------------------------------------------------------------------
diff --git a/090/images/log_cleaner_anatomy.png b/090/images/log_cleaner_anatomy.png
new file mode 100644
index 0000000..fb425b0
Binary files /dev/null and b/090/images/log_cleaner_anatomy.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/log_compaction.png
----------------------------------------------------------------------
diff --git a/090/images/log_compaction.png b/090/images/log_compaction.png
new file mode 100644
index 0000000..4e4a833
Binary files /dev/null and b/090/images/log_compaction.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/mirror-maker.png
----------------------------------------------------------------------
diff --git a/090/images/mirror-maker.png b/090/images/mirror-maker.png
new file mode 100644
index 0000000..8f76b1f
Binary files /dev/null and b/090/images/mirror-maker.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/producer_consumer.png
----------------------------------------------------------------------
diff --git a/090/images/producer_consumer.png b/090/images/producer_consumer.png
new file mode 100644
index 0000000..4b10cc9
Binary files /dev/null and b/090/images/producer_consumer.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/images/tracking_high_level.png
----------------------------------------------------------------------
diff --git a/090/images/tracking_high_level.png b/090/images/tracking_high_level.png
new file mode 100644
index 0000000..b643230
Binary files /dev/null and b/090/images/tracking_high_level.png differ

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/implementation.html
----------------------------------------------------------------------
diff --git a/090/implementation.html b/090/implementation.html
index 3b878af..b95d36f 100644
--- a/090/implementation.html
+++ b/090/implementation.html
@@ -1,12 +1,29 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <h3><a id="apidesign">5.1 API Design</a></h3>
 
 <h4>Producer APIs</h4>
 
 <p>
-The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>. 
+The Producer API that wraps the 2 low-level producers - <code>kafka.producer.SyncProducer</code> and <code>kafka.producer.async.AsyncProducer</code>.
 <pre>
 class Producer<T> {
-	
+
   /* Sends the data, partitioned by key to the topic using either the */
   /* synchronous or the asynchronous producer */
   public void send(kafka.javaapi.producer.ProducerData&lt;K,V&gt; producerData);
@@ -15,21 +32,21 @@ class Producer<T> {
   /* the synchronous or the asynchronous producer */
   public void send(java.util.List&lt;kafka.javaapi.producer.ProducerData&lt;K,V&gt;&gt; producerData);
 
-  /* Closes the producer and cleans up */	
+  /* Closes the producer and cleans up */
   public void close();
 
 }
 </pre>
 
-The goal is to expose all the producer functionality through a single API to the client.  
+The goal is to expose all the producer functionality through a single API to the client.
 
 The new producer -
 <ul>
-<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data - 	
+<li>can handle queueing/buffering of multiple producer requests and asynchronous dispatch of the batched data -
 <p><code>kafka.producer.Producer</code> provides the ability to batch multiple produce requests (<code>producer.type=async</code>), before serializing and dispatching them to the appropriate kafka broker partition. The size of the batch can be controlled by a few config parameters. As events enter a queue, they are buffered in a queue, until either <code>queue.time</code> or <code>batch.size</code> is reached. A background thread (<code>kafka.producer.async.ProducerSendThread</code>) dequeues the batch of data and lets the <code>kafka.producer.EventHandler</code> serialize and send the data to the appropriate kafka broker partition. A custom event handler can be plugged in through the <code>event.handler</code> config parameter. At various stages of this producer queue pipeline, it is helpful to be able to inject callbacks, either for plugging in custom logging/tracing code or custom monitoring logic. This is possible by implementing the <code>kafka.producer.async.CallbackHandler</c
 ode> interface and setting <code>callback.handler</code> config parameter to that class.
 </p>
 </li>
-<li>handles the serialization of data through a user-specified <code>Encoder</code> - 
+<li>handles the serialization of data through a user-specified <code>Encoder</code> -
 <pre>
 interface Encoder&lt;T&gt; {
   public Message toMessage(T data);
@@ -37,15 +54,15 @@ interface Encoder&lt;T&gt; {
 </pre>
 <p>The default is the no-op <code>kafka.serializer.DefaultEncoder</code></p>
 </li>
-<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> - 
+<li>provides software load balancing through an optionally user-specified <code>Partitioner</code> -
 <p>
-The routing decision is influenced by the <code>kafka.producer.Partitioner</code>. 
+The routing decision is influenced by the <code>kafka.producer.Partitioner</code>.
 <pre>
 interface Partitioner&lt;T&gt; {
    int partition(T key, int numPartitions);
 }
 </pre>
-The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter.	
+The partition API uses the key and the number of available broker partitions to return a partition id. This id is used as an index into a sorted list of broker_ids and partitions to pick a broker partition for the producer request. The default partitioning strategy is <code>hash(key)%numPartitions</code>. If the key is null, then a random broker partition is picked. A custom partitioning strategy can also be plugged in using the <code>partitioner.class</code> config parameter.
 </p>
 </li>
 </ul>
@@ -62,11 +79,11 @@ The high-level API hides the details of brokers from the consumer and allows con
 <h5>Low-level API</h5>
 <pre>
 class SimpleConsumer {
-	
-  /* Send fetch request to a broker and get back a set of messages. */ 
+
+  /* Send fetch request to a broker and get back a set of messages. */
   public ByteBufferMessageSet fetch(FetchRequest request);
 
-  /* Send a list of fetch requests to a broker and get back a response set. */ 
+  /* Send a list of fetch requests to a broker and get back a response set. */
   public MultiFetchResponse multifetch(List&lt;FetchRequest&gt; fetches);
 
   /**
@@ -80,16 +97,16 @@ class SimpleConsumer {
 }
 </pre>
 
-The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers (such as the hadoop consumer) which have particular requirements around maintaining state.
+The low-level API is used to implement the high-level API as well as being used directly for some of our offline consumers which have particular requirements around maintaining state.
 
 <h5>High-level API</h5>
 <pre>
 
-/* create a connection to the cluster */ 
+/* create a connection to the cluster */
 ConsumerConnector connector = Consumer.create(consumerConfig);
 
 interface ConsumerConnector {
-	
+
   /**
    * This method is used to get a list of KafkaStreams, which are iterators over
    * MessageAndMetadata objects from which you can obtain messages and their
@@ -97,7 +114,7 @@ interface ConsumerConnector {
    *  Input: a map of &lt;topic, #streams&gt;
    *  Output: a map of &lt;topic, list of message streams&gt;
    */
-  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap); 
+  public Map&lt;String,List&lt;KafkaStream&gt;&gt; createMessageStreams(Map&lt;String,Int&gt; topicCountMap);
 
   /**
    * You can also obtain a list of KafkaStreams, that iterate over messages
@@ -109,7 +126,7 @@ interface ConsumerConnector {
 
   /* Commit the offsets of all messages consumed so far. */
   public commitOffsets()
-  
+
   /* Shut down the connector */
   public shutdown()
 }
@@ -132,27 +149,27 @@ Messages consist of a fixed-size header and variable length opaque byte array pa
 <h3><a id="messageformat">5.4 Message Format</a></h3>
 
 <pre>
-	/** 
-	 * A message. The format of an N byte message is the following: 
-	 * 
-	 * If magic byte is 0 
-	 * 
-	 * 1. 1 byte "magic" identifier to allow format changes 
-	 * 
-	 * 2. 4 byte CRC32 of the payload 
-	 * 
-	 * 3. N - 5 byte payload 
-	 * 
-	 * If magic byte is 1 
-	 * 
-	 * 1. 1 byte "magic" identifier to allow format changes 
-	 * 
-	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 
-	 * 
-	 * 3. 4 byte CRC32 of the payload 
-	 * 
-	 * 4. N - 6 byte payload 
-	 * 
+	/**
+	 * A message. The format of an N byte message is the following:
+	 *
+	 * If magic byte is 0
+	 *
+	 * 1. 1 byte "magic" identifier to allow format changes
+	 *
+	 * 2. 4 byte CRC32 of the payload
+	 *
+	 * 3. N - 5 byte payload
+	 *
+	 * If magic byte is 1
+	 *
+	 * 1. 1 byte "magic" identifier to allow format changes
+	 *
+	 * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
+	 *
+	 * 3. 4 byte CRC32 of the payload
+	 *
+	 * 4. N - 6 byte payload
+	 *
 	 */
 </pre>
 </p>
@@ -166,7 +183,7 @@ The exact binary format for messages is versioned and maintained as a standard i
 <pre>
 On-disk format of a message
 
-message length : 4 bytes (value: 1+4+n) 
+message length : 4 bytes (value: 1+4+n)
 "magic" value  : 1 byte
 crc            : 4 bytes
 payload        : n bytes
@@ -174,7 +191,7 @@ payload        : n bytes
 <p>
 The use of the message offset as the message id is unusual. Our original idea was to use a GUID generated by the producer, and maintain a mapping from GUID to offset on each broker. But since a consumer must maintain an ID for each server, the global uniqueness of the GUID provides no value. Furthermore the complexity of maintaining the mapping from a random id to an offset requires a heavy weight index structure which must be synchronized with disk, essentially requiring a full persistent random-access data structure. Thus to simplify the lookup structure we decided to use a simple per-partition atomic counter which could be coupled with the partition id and node id to uniquely identify a message; this makes the lookup structure simpler, though multiple seeks per consumer request are still likely. However once we settled on a counter, the jump to directly using the offset seemed natural&mdash;both after all are monotonically increasing integers unique to a partition. Since the offs
 et is hidden from the consumer API this decision is ultimately an implementation detail and we went with the more efficient approach.
 </p>
-<img src="../images/kafka_log.png">
+<img src="images/kafka_log.png">
 <h4>Writes</h4>
 <p>
 The log allows serial appends which always go to the last file. This file is rolled over to a fresh file when it reaches a configurable size (say 1GB). The log takes two configuration parameter <i>M</i> which gives the number of messages to write before forcing the OS to flush the file to disk, and <i>S</i> which gives a number of seconds after which a flush is forced. This gives a durability guarantee of losing at most <i>M</i> messages or <i>S</i> seconds of data in the event of a system crash.
@@ -272,7 +289,7 @@ When an element in a path is denoted [xyz], that means that the value of xyz is
 This is a list of all present broker nodes, each of which provides a unique logical broker id which identifies it to consumers (which must be given as part of its configuration). On startup, a broker node registers itself by creating a znode with the logical broker id under /brokers/ids. The purpose of the logical broker id is to allow a broker to be moved to a different physical machine without affecting consumers. An attempt to register a broker id that is already in use (say because two servers are configured with the same broker id) is an error.
 </p>
 <p>
-Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).	
+Since the broker registers itself in ZooKeeper using ephemeral znodes, this registration is dynamic and will disappear if the broker is shutdown or dies (thus notifying consumers it is no longer available).
 </p>
 <h4>Broker Topic Registry</h4>
 <pre>
@@ -289,7 +306,7 @@ Consumers of topics also register themselves in ZooKeeper, in order to coordinat
 </p>
 
 <p>
-Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id. 
+Multiple consumers can form a group and jointly consume a single topic. Each consumer in the same group is given a shared group_id.
 For example if one consumer is your foobar process, which is run across three machines, then you might assign this group of consumers the id "foobar". This group id is provided in the configuration of the consumer, and is your way to tell the consumer which group it belongs to.
 </p>
 
@@ -354,7 +371,7 @@ The consumer rebalancing algorithms allows all the consumers in a group to come
 Each consumer does the following during rebalancing:
 </p>
 <pre>
-   1. For each topic T that C<sub>i</sub> subscribes to 
+   1. For each topic T that C<sub>i</sub> subscribes to
    2.   let P<sub>T</sub> be all partitions producing topic T
    3.   let C<sub>G</sub> be all consumers in the same group as C<sub>i</sub> that consume topic T
    4.   sort P<sub>T</sub> (so partitions on the same broker are clustered together)

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/introduction.html
----------------------------------------------------------------------
diff --git a/090/introduction.html b/090/introduction.html
index a182182..7e0b150 100644
--- a/090/introduction.html
+++ b/090/introduction.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <h3><a id="introduction">1.1 Introduction</a></h3>
 Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
 <p>
@@ -13,7 +30,7 @@ First let's review some basic messaging terminology:
 
 So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:
 <div style="text-align: center; width: 100%">
-  <img src="../images/producer_consumer.png">
+  <img src="images/producer_consumer.png">
 </div>
 
 Communication between the clients and the servers is done with a simple, high-performance, language agnostic <a href="https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol">TCP protocol</a>. We provide a Java client for Kafka, but clients are available in <a href="https://cwiki.apache.org/confluence/display/KAFKA/Clients">many languages</a>.
@@ -23,7 +40,7 @@ Let's first dive into the high-level abstraction Kafka provides&mdash;the topic.
 <p>
 A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:
 <div style="text-align: center; width: 100%">
-  <img src="../images/log_anatomy.png">
+  <img src="images/log_anatomy.png">
 </div>
 Each partition is an ordered, immutable sequence of messages that is continually appended to&mdash;a commit log. The messages in the partitions are each assigned a sequential id number called the <i>offset</i> that uniquely identifies each message within the partition.
 <p>
@@ -59,7 +76,7 @@ More commonly, however, we have found that topics have a small number of consume
 <p>
 
 <div style="float: right; margin: 20px; width: 500px" class="caption">
-  <img src="../images/consumer-groups.png"><br>
+  <img src="images/consumer-groups.png"><br>
   A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
 </div>
 <p>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/kafka_config.html
----------------------------------------------------------------------
diff --git a/090/kafka_config.html b/090/kafka_config.html
new file mode 100644
index 0000000..2125b6e
--- /dev/null
+++ b/090/kafka_config.html
@@ -0,0 +1,268 @@
+<table>
+<tr>
+<th>Name</th>
+<th>Type</th>
+<th>Default</th>
+<th>Valid Values</th>
+<th>Importance</th>
+<th>Description</th>
+</tr>
+<tr>
+<td>zookeeper.connect</td><td>string</td><td></td><td></td><td>high</td><td>Zookeeper host string</td></tr>
+<tr>
+<td>advertised.host.name</td><td>string</td><td>null</td><td></td><td>high</td><td>Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, it will use the value for "host.name" if configured. Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName().</td></tr>
+<tr>
+<td>advertised.listeners</td><td>string</td><td>null</td><td></td><td>high</td><td>Listeners to publish to ZooKeeper for clients to use, if different than the listeners above. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for "listeners" will be used.</td></tr>
+<tr>
+<td>advertised.port</td><td>int</td><td>null</td><td></td><td>high</td><td>The port to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the port to which the broker binds. If this is not set, it will publish the same port that the broker binds to.</td></tr>
+<tr>
+<td>auto.create.topics.enable</td><td>boolean</td><td>true</td><td></td><td>high</td><td>Enable auto creation of topic on the server</td></tr>
+<tr>
+<td>auto.leader.rebalance.enable</td><td>boolean</td><td>true</td><td></td><td>high</td><td>Enables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervals</td></tr>
+<tr>
+<td>background.threads</td><td>int</td><td>10</td><td>[1,...]</td><td>high</td><td>The number of threads to use for various background processing tasks</td></tr>
+<tr>
+<td>broker.id</td><td>int</td><td>-1</td><td></td><td>high</td><td>The broker id for this server. To avoid conflicts between zookeeper generated brokerId and user's config.brokerId added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1.</td></tr>
+<tr>
+<td>compression.type</td><td>string</td><td>producer</td><td></td><td>high</td><td>Specify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.</td></tr>
+<tr>
+<td>delete.topic.enable</td><td>boolean</td><td>false</td><td></td><td>high</td><td>Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off</td></tr>
+<tr>
+<td>host.name</td><td>string</td><td>""</td><td></td><td>high</td><td>hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces</td></tr>
+<tr>
+<td>leader.imbalance.check.interval.seconds</td><td>long</td><td>300</td><td></td><td>high</td><td>The frequency with which the partition rebalance check is triggered by the controller</td></tr>
+<tr>
+<td>leader.imbalance.per.broker.percentage</td><td>int</td><td>10</td><td></td><td>high</td><td>The ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.</td></tr>
+<tr>
+<td>listeners</td><td>string</td><td>null</td><td></td><td>high</td><td>Listener List - Comma-separated list of URIs we will listen on and their protocols.
+ Specify hostname as 0.0.0.0 to bind to all interfaces.
+ Leave hostname empty to bind to default interface.
+ Examples of legal listener lists:
+ PLAINTEXT://myhost:9092,TRACE://:9091
+ PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093
+</td></tr>
+<tr>
+<td>log.dir</td><td>string</td><td>/tmp/kafka-logs</td><td></td><td>high</td><td>The directory in which the log data is kept (supplemental for log.dirs property)</td></tr>
+<tr>
+<td>log.dirs</td><td>string</td><td>null</td><td></td><td>high</td><td>The directories in which the log data is kept. If not set, the value in log.dir is used</td></tr>
+<tr>
+<td>log.flush.interval.messages</td><td>long</td><td>9223372036854775807</td><td>[1,...]</td><td>high</td><td>The number of messages accumulated on a log partition before messages are flushed to disk </td></tr>
+<tr>
+<td>log.flush.interval.ms</td><td>long</td><td>null</td><td></td><td>high</td><td>The maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in log.flush.scheduler.interval.ms is used</td></tr>
+<tr>
+<td>log.flush.offset.checkpoint.interval.ms</td><td>int</td><td>60000</td><td>[0,...]</td><td>high</td><td>The frequency with which we update the persistent record of the last flush which acts as the log recovery point</td></tr>
+<tr>
+<td>log.flush.scheduler.interval.ms</td><td>long</td><td>9223372036854775807</td><td></td><td>high</td><td>The frequency in ms that the log flusher checks whether any log needs to be flushed to disk</td></tr>
+<tr>
+<td>log.retention.bytes</td><td>long</td><td>-1</td><td></td><td>high</td><td>The maximum size of the log before deleting it</td></tr>
+<tr>
+<td>log.retention.hours</td><td>int</td><td>168</td><td></td><td>high</td><td>The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property</td></tr>
+<tr>
+<td>log.retention.minutes</td><td>int</td><td>null</td><td></td><td>high</td><td>The number of minutes to keep a log file before deleting it (in minutes), secondary to log.retention.ms property. If not set, the value in log.retention.hours is used</td></tr>
+<tr>
+<td>log.retention.ms</td><td>long</td><td>null</td><td></td><td>high</td><td>The number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is used</td></tr>
+<tr>
+<td>log.roll.hours</td><td>int</td><td>168</td><td>[1,...]</td><td>high</td><td>The maximum time before a new log segment is rolled out (in hours), secondary to log.roll.ms property</td></tr>
+<tr>
+<td>log.roll.jitter.hours</td><td>int</td><td>0</td><td>[0,...]</td><td>high</td><td>The maximum jitter to subtract from logRollTimeMillis (in hours), secondary to log.roll.jitter.ms property</td></tr>
+<tr>
+<td>log.roll.jitter.ms</td><td>long</td><td>null</td><td></td><td>high</td><td>The maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in log.roll.jitter.hours is used</td></tr>
+<tr>
+<td>log.roll.ms</td><td>long</td><td>null</td><td></td><td>high</td><td>The maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in log.roll.hours is used</td></tr>
+<tr>
+<td>log.segment.bytes</td><td>int</td><td>1073741824</td><td>[14,...]</td><td>high</td><td>The maximum size of a single log file</td></tr>
+<tr>
+<td>log.segment.delete.delay.ms</td><td>long</td><td>60000</td><td>[0,...]</td><td>high</td><td>The amount of time to wait before deleting a file from the filesystem</td></tr>
+<tr>
+<td>message.max.bytes</td><td>int</td><td>1000012</td><td>[0,...]</td><td>high</td><td>The maximum size of message that the server can receive</td></tr>
+<tr>
+<td>min.insync.replicas</td><td>int</td><td>1</td><td>[1,...]</td><td>high</td><td>define the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)</td></tr>
+<tr>
+<td>num.io.threads</td><td>int</td><td>8</td><td>[1,...]</td><td>high</td><td>The number of io threads that the server uses for carrying out network requests</td></tr>
+<tr>
+<td>num.network.threads</td><td>int</td><td>3</td><td>[1,...]</td><td>high</td><td>the number of network threads that the server uses for handling network requests</td></tr>
+<tr>
+<td>num.recovery.threads.per.data.dir</td><td>int</td><td>1</td><td>[1,...]</td><td>high</td><td>The number of threads per data directory to be used for log recovery at startup and flushing at shutdown</td></tr>
+<tr>
+<td>num.replica.fetchers</td><td>int</td><td>1</td><td></td><td>high</td><td>Number of fetcher threads used to replicate messages from a source broker. Increasing this value can increase the degree of I/O parallelism in the follower broker.</td></tr>
+<tr>
+<td>offset.metadata.max.bytes</td><td>int</td><td>4096</td><td></td><td>high</td><td>The maximum size for a metadata entry associated with an offset commit</td></tr>
+<tr>
+<td>offsets.commit.required.acks</td><td>short</td><td>-1</td><td></td><td>high</td><td>The required acks before the commit can be accepted. In general, the default (-1) should not be overridden</td></tr>
+<tr>
+<td>offsets.commit.timeout.ms</td><td>int</td><td>5000</td><td>[1,...]</td><td>high</td><td>Offset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. This is similar to the producer request timeout.</td></tr>
+<tr>
+<td>offsets.load.buffer.size</td><td>int</td><td>5242880</td><td>[1,...]</td><td>high</td><td>Batch size for reading from the offsets segments when loading offsets into the cache.</td></tr>
+<tr>
+<td>offsets.retention.check.interval.ms</td><td>long</td><td>600000</td><td>[1,...]</td><td>high</td><td>Frequency at which to check for stale offsets</td></tr>
+<tr>
+<td>offsets.retention.minutes</td><td>int</td><td>1440</td><td>[1,...]</td><td>high</td><td>Log retention window in minutes for offsets topic</td></tr>
+<tr>
+<td>offsets.topic.compression.codec</td><td>int</td><td>0</td><td></td><td>high</td><td>Compression codec for the offsets topic - compression may be used to achieve "atomic" commits</td></tr>
+<tr>
+<td>offsets.topic.num.partitions</td><td>int</td><td>50</td><td>[1,...]</td><td>high</td><td>The number of partitions for the offset commit topic (should not change after deployment)</td></tr>
+<tr>
+<td>offsets.topic.replication.factor</td><td>short</td><td>3</td><td>[1,...]</td><td>high</td><td>The replication factor for the offsets topic (set higher to ensure availability). To ensure that the effective replication factor of the offsets topic is the configured value, the number of alive brokers has to be at least the replication factor at the time of the first request for the offsets topic. If not, either the offsets topic creation will fail or it will get a replication factor of min(alive brokers, configured replication factor)</td></tr>
+<tr>
+<td>offsets.topic.segment.bytes</td><td>int</td><td>104857600</td><td>[1,...]</td><td>high</td><td>The offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loads</td></tr>
+<tr>
+<td>port</td><td>int</td><td>9092</td><td></td><td>high</td><td>the port to listen and accept connections on</td></tr>
+<tr>
+<td>queued.max.requests</td><td>int</td><td>500</td><td>[1,...]</td><td>high</td><td>The number of queued requests allowed before blocking the network threads</td></tr>
+<tr>
+<td>quota.consumer.default</td><td>long</td><td>9223372036854775807</td><td>[1,...]</td><td>high</td><td>Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second</td></tr>
+<tr>
+<td>quota.producer.default</td><td>long</td><td>9223372036854775807</td><td>[1,...]</td><td>high</td><td>Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second</td></tr>
+<tr>
+<td>replica.fetch.max.bytes</td><td>int</td><td>1048576</td><td></td><td>high</td><td>The number of byes of messages to attempt to fetch</td></tr>
+<tr>
+<td>replica.fetch.min.bytes</td><td>int</td><td>1</td><td></td><td>high</td><td>Minimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMs</td></tr>
+<tr>
+<td>replica.fetch.wait.max.ms</td><td>int</td><td>500</td><td></td><td>high</td><td>max wait time for each fetcher request issued by follower replicas. This value should always be less than the replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topics</td></tr>
+<tr>
+<td>replica.high.watermark.checkpoint.interval.ms</td><td>long</td><td>5000</td><td></td><td>high</td><td>The frequency with which the high watermark is saved out to disk</td></tr>
+<tr>
+<td>replica.lag.time.max.ms</td><td>long</td><td>10000</td><td></td><td>high</td><td>If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr</td></tr>
+<tr>
+<td>replica.socket.receive.buffer.bytes</td><td>int</td><td>65536</td><td></td><td>high</td><td>The socket receive buffer for network requests</td></tr>
+<tr>
+<td>replica.socket.timeout.ms</td><td>int</td><td>30000</td><td></td><td>high</td><td>The socket timeout for network requests. Its value should be at least replica.fetch.wait.max.ms</td></tr>
+<tr>
+<td>request.timeout.ms</td><td>int</td><td>30000</td><td></td><td>high</td><td>The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.</td></tr>
+<tr>
+<td>socket.receive.buffer.bytes</td><td>int</td><td>102400</td><td></td><td>high</td><td>The SO_RCVBUF buffer of the socket sever sockets</td></tr>
+<tr>
+<td>socket.request.max.bytes</td><td>int</td><td>104857600</td><td>[1,...]</td><td>high</td><td>The maximum number of bytes in a socket request</td></tr>
+<tr>
+<td>socket.send.buffer.bytes</td><td>int</td><td>102400</td><td></td><td>high</td><td>The SO_SNDBUF buffer of the socket sever sockets</td></tr>
+<tr>
+<td>unclean.leader.election.enable</td><td>boolean</td><td>true</td><td></td><td>high</td><td>Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss</td></tr>
+<tr>
+<td>zookeeper.connection.timeout.ms</td><td>int</td><td>null</td><td></td><td>high</td><td>The max time that the client waits to establish a connection to zookeeper. If not set, the value in zookeeper.session.timeout.ms is used</td></tr>
+<tr>
+<td>zookeeper.session.timeout.ms</td><td>int</td><td>6000</td><td></td><td>high</td><td>Zookeeper session timeout</td></tr>
+<tr>
+<td>zookeeper.set.acl</td><td>boolean</td><td>false</td><td></td><td>high</td><td>Set client to use secure ACLs</td></tr>
+<tr>
+<td>connections.max.idle.ms</td><td>long</td><td>600000</td><td></td><td>medium</td><td>Idle connections timeout: the server socket processor threads close the connections that idle more than this</td></tr>
+<tr>
+<td>controlled.shutdown.enable</td><td>boolean</td><td>true</td><td></td><td>medium</td><td>Enable controlled shutdown of the server</td></tr>
+<tr>
+<td>controlled.shutdown.max.retries</td><td>int</td><td>3</td><td></td><td>medium</td><td>Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens</td></tr>
+<tr>
+<td>controlled.shutdown.retry.backoff.ms</td><td>long</td><td>5000</td><td></td><td>medium</td><td>Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying.</td></tr>
+<tr>
+<td>controller.socket.timeout.ms</td><td>int</td><td>30000</td><td></td><td>medium</td><td>The socket timeout for controller-to-broker channels</td></tr>
+<tr>
+<td>default.replication.factor</td><td>int</td><td>1</td><td></td><td>medium</td><td>default replication factors for automatically created topics</td></tr>
+<tr>
+<td>fetch.purgatory.purge.interval.requests</td><td>int</td><td>1000</td><td></td><td>medium</td><td>The purge interval (in number of requests) of the fetch request purgatory</td></tr>
+<tr>
+<td>group.max.session.timeout.ms</td><td>int</td><td>30000</td><td></td><td>medium</td><td>The maximum allowed session timeout for registered consumers</td></tr>
+<tr>
+<td>group.min.session.timeout.ms</td><td>int</td><td>6000</td><td></td><td>medium</td><td>The minimum allowed session timeout for registered consumers</td></tr>
+<tr>
+<td>inter.broker.protocol.version</td><td>string</td><td>0.9.0.X</td><td></td><td>medium</td><td>Specify which version of the inter-broker protocol will be used.
+ This is typically bumped after all brokers were upgraded to a new version.
+ Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list.</td></tr>
+<tr>
+<td>log.cleaner.backoff.ms</td><td>long</td><td>15000</td><td>[0,...]</td><td>medium</td><td>The amount of time to sleep when there are no logs to clean</td></tr>
+<tr>
+<td>log.cleaner.dedupe.buffer.size</td><td>long</td><td>524288000</td><td></td><td>medium</td><td>The total memory used for log deduplication across all cleaner threads</td></tr>
+<tr>
+<td>log.cleaner.delete.retention.ms</td><td>long</td><td>86400000</td><td></td><td>medium</td><td>How long are delete records retained?</td></tr>
+<tr>
+<td>log.cleaner.enable</td><td>boolean</td><td>false</td><td></td><td>medium</td><td>Should we enable log cleaning?</td></tr>
+<tr>
+<td>log.cleaner.io.buffer.load.factor</td><td>double</td><td>0.9</td><td></td><td>medium</td><td>Log cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value will allow more log to be cleaned at once but will lead to more hash collisions</td></tr>
+<tr>
+<td>log.cleaner.io.buffer.size</td><td>int</td><td>524288</td><td>[0,...]</td><td>medium</td><td>The total memory used for log cleaner I/O buffers across all cleaner threads</td></tr>
+<tr>
+<td>log.cleaner.io.max.bytes.per.second</td><td>double</td><td>1.7976931348623157E308</td><td></td><td>medium</td><td>The log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on average</td></tr>
+<tr>
+<td>log.cleaner.min.cleanable.ratio</td><td>double</td><td>0.5</td><td></td><td>medium</td><td>The minimum ratio of dirty log to total log for a log to eligible for cleaning</td></tr>
+<tr>
+<td>log.cleaner.threads</td><td>int</td><td>1</td><td>[0,...]</td><td>medium</td><td>The number of background threads to use for log cleaning</td></tr>
+<tr>
+<td>log.cleanup.policy</td><td>string</td><td>delete</td><td>[compact, delete]</td><td>medium</td><td>The default cleanup policy for segments beyond the retention window, must be either "delete" or "compact"</td></tr>
+<tr>
+<td>log.index.interval.bytes</td><td>int</td><td>4096</td><td>[0,...]</td><td>medium</td><td>The interval with which we add an entry to the offset index</td></tr>
+<tr>
+<td>log.index.size.max.bytes</td><td>int</td><td>10485760</td><td>[4,...]</td><td>medium</td><td>The maximum size in bytes of the offset index</td></tr>
+<tr>
+<td>log.preallocate</td><td>boolean</td><td>false</td><td></td><td>medium</td><td>Should pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true.</td></tr>
+<tr>
+<td>log.retention.check.interval.ms</td><td>long</td><td>300000</td><td>[1,...]</td><td>medium</td><td>The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion</td></tr>
+<tr>
+<td>max.connections.per.ip</td><td>int</td><td>2147483647</td><td>[1,...]</td><td>medium</td><td>The maximum number of connections we allow from each ip address</td></tr>
+<tr>
+<td>max.connections.per.ip.overrides</td><td>string</td><td>""</td><td></td><td>medium</td><td>Per-ip or hostname overrides to the default maximum number of connections</td></tr>
+<tr>
+<td>num.partitions</td><td>int</td><td>1</td><td>[1,...]</td><td>medium</td><td>The default number of log partitions per topic</td></tr>
+<tr>
+<td>principal.builder.class</td><td>class</td><td>class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder</td><td></td><td>medium</td><td>principal builder to generate a java Principal. This config is optional for client.</td></tr>
+<tr>
+<td>producer.purgatory.purge.interval.requests</td><td>int</td><td>1000</td><td></td><td>medium</td><td>The purge interval (in number of requests) of the producer request purgatory</td></tr>
+<tr>
+<td>replica.fetch.backoff.ms</td><td>int</td><td>1000</td><td>[0,...]</td><td>medium</td><td>The amount of time to sleep when fetch partition error occurs.</td></tr>
+<tr>
+<td>reserved.broker.max.id</td><td>int</td><td>1000</td><td>[0,...]</td><td>medium</td><td>reserved.broker.max.id</td></tr>
+<tr>
+<td>sasl.kerberos.kinit.cmd</td><td>string</td><td>/usr/bin/kinit</td><td></td><td>medium</td><td>Kerberos kinit command path. Default is /usr/bin/kinit</td></tr>
+<tr>
+<td>sasl.kerberos.min.time.before.relogin</td><td>long</td><td>60000</td><td></td><td>medium</td><td>Login thread sleep time between refresh attempts.</td></tr>
+<tr>
+<td>sasl.kerberos.principal.to.local.rules</td><td>list</td><td>[DEFAULT]</td><td></td><td>medium</td><td>A list of rules for mapping from principal names to short names (typically operating system usernames). The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. By default, principal names of the form <username>/<hostname>@<REALM> are mapped to <username>.</td></tr>
+<tr>
+<td>sasl.kerberos.service.name</td><td>string</td><td>null</td><td></td><td>medium</td><td>The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.jitter</td><td>double</td><td>0.05</td><td></td><td>medium</td><td>Percentage of random jitter added to the renewal time.</td></tr>
+<tr>
+<td>sasl.kerberos.ticket.renew.window.factor</td><td>double</td><td>0.8</td><td></td><td>medium</td><td>Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.</td></tr>
+<tr>
+<td>security.inter.broker.protocol</td><td>string</td><td>PLAINTEXT</td><td></td><td>medium</td><td>Security protocol used to communicate between brokers. Defaults to plain text.</td></tr>
+<tr>
+<td>ssl.cipher.suites</td><td>list</td><td>null</td><td></td><td>medium</td><td>A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.</td></tr>
+<tr>
+<td>ssl.client.auth</td><td>string</td><td>none</td><td>[required, requested, none]</td><td>medium</td><td>Configures kafka broker to request client authentication. The following settings are common:  <ul> <li><code>ssl.want.client.auth=required</code> If set to required client authentication is required. <li><code>ssl.client.auth=requested</code> This means client authentication is optional. unlike requested , if this option is set client can choose not to provide authentication information about itself <li><code>ssl.client.auth=none</code> This means client authentication is not needed.</td></tr>
+<tr>
+<td>ssl.enabled.protocols</td><td>list</td><td>[TLSv1.2, TLSv1.1, TLSv1]</td><td></td><td>medium</td><td>The list of protocols enabled for SSL connections. TLSv1.2, TLSv1.1 and TLSv1 are enabled by default.</td></tr>
+<tr>
+<td>ssl.key.password</td><td>string</td><td>null</td><td></td><td>medium</td><td>The password of the private key in the key store file. This is optional for client.</td></tr>
+<tr>
+<td>ssl.keymanager.algorithm</td><td>string</td><td>SunX509</td><td></td><td>medium</td><td>The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+<tr>
+<td>ssl.keystore.location</td><td>string</td><td>null</td><td></td><td>medium</td><td>The location of the key store file. This is optional for client and can be used for two-way authentication for client.</td></tr>
+<tr>
+<td>ssl.keystore.password</td><td>string</td><td>null</td><td></td><td>medium</td><td>The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. </td></tr>
+<tr>
+<td>ssl.keystore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the key store file. This is optional for client. Default value is JKS</td></tr>
+<tr>
+<td>ssl.protocol</td><td>string</td><td>TLS</td><td></td><td>medium</td><td>The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.</td></tr>
+<tr>
+<td>ssl.provider</td><td>string</td><td>null</td><td></td><td>medium</td><td>The name of the security provider used for SSL connections. Default value is the default security provider of the JVM.</td></tr>
+<tr>
+<td>ssl.trustmanager.algorithm</td><td>string</td><td>PKIX</td><td></td><td>medium</td><td>The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.</td></tr>
+<tr>
+<td>ssl.truststore.location</td><td>string</td><td>null</td><td></td><td>medium</td><td>The location of the trust store file. </td></tr>
+<tr>
+<td>ssl.truststore.password</td><td>string</td><td>null</td><td></td><td>medium</td><td>The password for the trust store file. </td></tr>
+<tr>
+<td>ssl.truststore.type</td><td>string</td><td>JKS</td><td></td><td>medium</td><td>The file format of the trust store file. Default value is JKS.</td></tr>
+<tr>
+<td>authorizer.class.name</td><td>string</td><td>""</td><td></td><td>low</td><td>The authorizer class that should be used for authorization</td></tr>
+<tr>
+<td>metric.reporters</td><td>list</td><td>[]</td><td></td><td>low</td><td>A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.</td></tr>
+<tr>
+<td>metrics.num.samples</td><td>int</td><td>2</td><td>[1,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>metrics.sample.window.ms</td><td>long</td><td>30000</td><td>[1,...]</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
+<tr>
+<td>quota.window.num</td><td>int</td><td>11</td><td>[1,...]</td><td>low</td><td>The number of samples to retain in memory</td></tr>
+<tr>
+<td>quota.window.size.seconds</td><td>int</td><td>1</td><td>[1,...]</td><td>low</td><td>The time span of each sample</td></tr>
+<tr>
+<td>ssl.endpoint.identification.algorithm</td><td>string</td><td>null</td><td></td><td>low</td><td>The endpoint identification algorithm to validate server hostname using server certificate. </td></tr>
+<tr>
+<td>zookeeper.sync.time.ms</td><td>int</td><td>2000</td><td></td><td>low</td><td>How far a ZK follower can be behind a ZK leader</td></tr>
+</table>

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/migration.html
----------------------------------------------------------------------
diff --git a/090/migration.html b/090/migration.html
index 922415c..18ab6d4 100644
--- a/090/migration.html
+++ b/090/migration.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 <!--#include virtual="../includes/header.html" -->
 <h2>Migrating from 0.7.x to 0.8</h2>
 
@@ -14,4 +31,4 @@
     <li>Drink.
 </ol>
 
-<!--#include virtual="../includes/footer.html" -->
\ No newline at end of file
+<!--#include virtual="../includes/footer.html" -->


[3/3] kafka-site git commit: add 0.9.0 docs

Posted by ju...@apache.org.
add 0.9.0 docs


Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/8c4a140c
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/8c4a140c
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/8c4a140c

Branch: refs/heads/asf-site
Commit: 8c4a140cfbb841902ab6ac5c114b43c3092535d5
Parents: e4d9849
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Nov 9 17:08:09 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Nov 9 17:08:09 2015 -0800

----------------------------------------------------------------------
 090/api.html                         |  40 +-
 090/configuration.html               | 642 ++----------------------------
 090/connect.html                     | 328 +++++++++++++++
 090/connect_config.html              | 112 ++++++
 090/consumer_config.html             | 102 +++++
 090/design.html                      |  48 ++-
 090/documentation.html               |  61 ++-
 090/ecosystem.html                   |  17 +
 090/images/consumer-groups.png       | Bin 0 -> 26820 bytes
 090/images/kafka_log.png             | Bin 0 -> 134321 bytes
 090/images/kafka_multidc.png         | Bin 0 -> 33959 bytes
 090/images/kafka_multidc_complex.png | Bin 0 -> 38559 bytes
 090/images/log_anatomy.png           | Bin 0 -> 19579 bytes
 090/images/log_cleaner_anatomy.png   | Bin 0 -> 18638 bytes
 090/images/log_compaction.png        | Bin 0 -> 41414 bytes
 090/images/mirror-maker.png          | Bin 0 -> 6579 bytes
 090/images/producer_consumer.png     | Bin 0 -> 8691 bytes
 090/images/tracking_high_level.png   | Bin 0 -> 82759 bytes
 090/implementation.html              | 103 +++--
 090/introduction.html                |  23 +-
 090/kafka_config.html                | 268 +++++++++++++
 090/migration.html                   |  19 +-
 090/ops.html                         |  56 ++-
 090/producer_config.html             | 106 +++++
 090/quickstart.html                  |  79 ++++
 090/security.html                    | 301 ++++++++++++++
 090/upgrade.html                     |  43 +-
 090/uses.html                        |  17 +
 28 files changed, 1686 insertions(+), 679 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/api.html
----------------------------------------------------------------------
diff --git a/090/api.html b/090/api.html
index 63dd8a3..835bdf2 100644
--- a/090/api.html
+++ b/090/api.html
@@ -1,9 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
 
 We are in the process of rewritting the JVM clients for Kafka. As of 0.8.2 Kafka includes a newly rewritten Java producer. The next release will include an equivalent Java consumer. These new clients are meant to supplant the existing Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server.
 
 <h3><a id="producerapi">2.1 Producer API</a></h3>
 
-As of the 0.8.2 release we encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client. You can use this client by adding a dependency on the client jar using the following maven co-ordinates:
+As of the 0.8.2 release we encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
 <pre>
 	&lt;dependency&gt;
 	    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
@@ -12,7 +28,7 @@ As of the 0.8.2 release we encourage all new development to use the new Java pro
 	&lt;/dependency&gt;
 </pre>
 
-Examples showing how to use the producer are given in the 
+Examples showing how to use the producer are given in the
 <a href="http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html" title="Kafka 0.8.2 Javadoc">javadocs</a>.
 
 <p>
@@ -117,7 +133,7 @@ class kafka.javaapi.consumer.SimpleConsumer {
    *  @param request a [[kafka.javaapi.OffsetRequest]] object.
    *  @return a [[kafka.javaapi.OffsetResponse]] object.
    */
-  public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
+  public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);
 
   /**
    * Close the SimpleConsumer.
@@ -128,11 +144,15 @@ class kafka.javaapi.consumer.SimpleConsumer {
 For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in
 <a href="https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example" title="Kafka 0.8 SimpleConsumer example">here</a>.
 
-<h3><a id="kafkahadoopconsumerapi">2.4 Kafka Hadoop Consumer API</a></h3>
-<p>
-Providing a horizontally scalable solution for aggregating and loading data into Hadoop was one of our basic use cases. To support this use case, we provide a Hadoop-based consumer which spawns off many map tasks to pull data from the Kafka cluster in parallel. This provides extremely fast pull-based Hadoop data load capabilities (we were able to fully saturate the network with only a handful of Kafka servers).
-</p>
+<h3><a id="newconsumerapi">2.4 New Consumer API</a></h3>
+As of the 0.9.0 release we have added a replacement for our existing simple and high-level consumers. This client is considered beta quality. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
+<pre>
+	&lt;dependency&gt;
+	    &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
+	    &lt;artifactId&gt;kafka-clients&lt;/artifactId&gt;
+	    &lt;version&gt;0.9.0.0&lt;/version&gt;
+	&lt;/dependency&gt;
+</pre>
 
-<p>
-Usage information on the hadoop consumer can be found <a href="https://github.com/linkedin/camus/">here</a>.
-</p>
+Examples showing how to use the producer are given in the
+<a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaConsumer.html" title="Kafka 0.9.0 Javadoc">javadocs</a>.

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/configuration.html
----------------------------------------------------------------------
diff --git a/090/configuration.html b/090/configuration.html
index 9ef621f..abaff63 100644
--- a/090/configuration.html
+++ b/090/configuration.html
@@ -1,3 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
 Kafka uses key-value pairs in the <a href="http://en.wikipedia.org/wiki/.properties">property file format</a> for configuration. These values can be supplied either from a file or programmatically.
 
 <h3><a id="brokerconfigs">3.1 Broker Configs</a></h3>
@@ -11,431 +28,11 @@ The essential configurations are the following:
 
 Topic-level configurations and defaults are discussed in more detail <a href="#topic-config">below</a>.
 
-<table class="data-table">
-<tbody><tr>
-      <th>Property</th>
-      <th>Default</th>
-      <th>Description</th>
-    </tr>
-    <tr>
-      <td>broker.id</td>
-      <td></td>
-      <td>Each broker is uniquely identified by a non-negative integer id. This id serves as the broker's "name" and allows the broker to be moved to a different host/port without confusing consumers. You can choose any number you like so long as it is unique.
-    </td>
-    </tr>
-    <tr>
-      <td>log.dirs</td>
-      <td nowrap>/tmp/kafka-logs</td>
-      <td>A comma-separated list of one or more directories in which Kafka data is stored. Each new partition that is created will be placed in the directory which currently has the fewest partitions.</td>
-    </tr>
-    <tr>
-      <td>port</td>
-      <td>9092</td>
-      <td>The port on which the server accepts client connections.</td>
-    </tr>
-    <tr>
-      <td>zookeeper.connect</td>
-      <td>null</td>
-      <td>Specifies the ZooKeeper connection string in the form <code>hostname:port</code>, where hostname and port are the host and port for a node in your ZooKeeper cluster. To allow connecting through other ZooKeeper nodes when that host is down you can also specify multiple hosts in the form <code>hostname1:port1,hostname2:port2,hostname3:port3</code>.
-    <p>
-ZooKeeper also allows you to add a "chroot" path which will make all kafka data for this cluster appear under a particular path. This is a way to setup multiple Kafka clusters or other applications on the same ZooKeeper cluster. To do this give a connection string in the form <code>hostname1:port1,hostname2:port2,hostname3:port3/chroot/path</code> which would put all this cluster's data under the path <code>/chroot/path</code>. Note that consumers must use the same connection string.</td>
-    </tr>
-    <tr>
-      <td>message.max.bytes</td>
-      <td>1000000</td>
-      <td>The maximum size of a message that the server can receive. It is important that this property be in sync with the maximum fetch size your consumers use or else an unruly producer will be able to publish messages too large for consumers to consume.</td>
-    </tr>
-    <tr>
-      <td>num.network.threads</td>
-      <td>3</td>
-      <td>The number of network threads that the server uses for handling network requests. You probably don't need to change this.</td>
-    </tr>
-    <tr>
-      <td>num.io.threads</td>
-      <td>8</td>
-      <td>The number of I/O threads that the server uses for executing requests. You should have at least as many threads as you have disks.</td>
-    </tr>
-    <tr>
-      <td>background.threads</td>
-      <td>10</td>
-      <td>The number of threads to use for various background processing tasks such as file deletion. You should not need to change this.</td>
-    </tr>
-    <tr>
-      <td>queued.max.requests</td>
-      <td>500</td>
-      <td>The number of requests that can be queued up for processing by the I/O threads before the network threads stop reading in new requests.</td>
-    </tr>
-    <tr>
-      <td>host.name</td>
-      <td>null</td>
-      <td>
-        <p>Hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces, and publish one to ZK.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>advertised.host.name</td>
-      <td>null</td>
-      <td>
-        <p>If this is set this is the hostname that will be given out to producers, consumers, and other brokers to connect to.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>advertised.port</td>
-      <td>null</td>
-      <td>
-        <p>The port to give out to producers, consumers, and other brokers to use in establishing connections. This only needs to be set if this port is different from the port the server should bind to.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>socket.send.buffer.bytes</td>
-      <td>100 * 1024</td>
-      <td>The SO_SNDBUFF buffer the server prefers for socket connections.</td>
-    </tr>
-    <tr>
-      <td>socket.receive.buffer.bytes</td>
-      <td>100 * 1024</td>
-      <td>The SO_RCVBUFF buffer the server prefers for socket connections.</td>
-    </tr>
-    <tr>
-      <td>socket.request.max.bytes</td>
-      <td>100 * 1024 * 1024</td>
-      <td>The maximum request size the server will allow. This prevents the server from running out of memory and should be smaller than the Java heap size.</td>
-    </tr>
-    <tr>
-      <td>num.partitions</td>
-      <td>1</td>
-      <td>The default number of partitions per topic if a partition count isn't given at topic creation time.</td>
-    </tr>
-    <tr>
-      <td>log.segment.bytes</td>
-      <td nowrap>1024 * 1024 * 1024</td>
-      <td>The log for a topic partition is stored as a directory of segment files. This setting controls the size to which a segment file will grow before a new segment is rolled over in the log. This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.roll.{ms,hours}</td>
-      <td>24 * 7 hours</td>
-      <td>This setting will force Kafka to roll a new log segment even if the log.segment.bytes size has not been reached. This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.cleanup.policy</td>
-      <td>delete</td>
-      <td>This can take either the value <i>delete</i> or <i>compact</i>. If <i>delete</i> is set, log segments will be deleted when they reach the size or time limits set. If <i>compact</i> is set <a href="#compaction">log compaction</a> will be used to clean out obsolete records. This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.retention.{ms,minutes,hours}</td>
-      <td>7 days</td>
-      <td>The amount of time to keep a log segment before it is deleted, i.e. the default data retention window for all topics. Note that if both log.retention.minutes and log.retention.bytes are both set we delete a segment when either limit is exceeded. This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.retention.bytes</td>
-      <td>-1</td>
-      <td>The amount of data to retain in the log for each topic-partitions. Note that this is the limit per-partition so multiply by the number of partitions to get the total data retained for the topic. Also note that if both log.retention.hours and log.retention.bytes are both set we delete a segment when either limit is exceeded. This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.retention.check.interval.ms</td>
-      <td>5 minutes</td>
-      <td>The period with which we check whether any log segment is eligible for deletion to meet the retention policies.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.enable</td>
-      <td>false</td>
-      <td>This configuration must be set to true for log compaction to run.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.threads</td>
-      <td>1</td>
-      <td>The number of threads to use for cleaning logs in log compaction.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.io.max.bytes.per.second</td>
-      <td>Double.MaxValue</td>
-      <td>The maximum amount of I/O the log cleaner can do while performing log compaction. This setting allows setting a limit for the cleaner to avoid impacting live request serving.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.dedupe.buffer.size</td>
-      <td>500*1024*1024</td>
-      <td>The size of the buffer the log cleaner uses for indexing and deduplicating logs during cleaning. Larger is better provided you have sufficient memory.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.io.buffer.size</td>
-      <td>512*1024</td>
-      <td>The size of the I/O chunk used during log cleaning. You probably don't need to change this.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.io.buffer.load.factor</td>
-      <td>0.9</td>
-      <td>The load factor of the hash table used in log cleaning. You probably don't need to change this.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.backoff.ms</td>
-      <td>15000</td>
-      <td>The interval between checks to see if any logs need cleaning.</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.min.cleanable.ratio</td>
-      <td>0.5</td>
-      <td>This configuration controls how frequently the log compactor will attempt to clean the log (assuming <a href="#compaction">log compaction</a> is enabled). By default we will avoid cleaning a log where more than 50% of the log has been compacted. This ratio bounds the maximum space wasted in the log by duplicates (at 50% at most 50% of the log could be duplicates). A higher ratio will mean fewer, more efficient cleanings but will mean more wasted space in the log. This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.cleaner.delete.retention.ms</td>
-      <td>1 day</td>
-      <td>The amount of time to retain delete tombstone markers for <a href="#compaction">log compacted</a> topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan). This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.index.size.max.bytes</td>
-      <td>10 * 1024 * 1024</td>
-      <td>The maximum size in bytes we allow for the offset index for each log segment. Note that we will always pre-allocate a sparse file with this much space and shrink it down when the log rolls. If the index fills up we will roll a new log segment even if we haven't reached the log.segment.bytes limit. This setting can be overridden on a per-topic basis (see <a href="#topic-config">the per-topic configuration section</a>).</td>
-    </tr>
-    <tr>
-      <td>log.index.interval.bytes</td>
-      <td>4096</td>
-      <td>The byte interval at which we add an entry to the offset index. When executing a fetch request the server must do a linear scan for up to this many bytes to find the correct position in the log to begin and end the fetch. So setting this value to be larger will mean larger index files (and a bit more memory usage) but less scanning. However the server will never add more than one index entry per log append (even if more than log.index.interval worth of messages are appended). In general you probably don't need to mess with this value.</td>
-    </tr>
-    <tr>
-      <td>log.flush.interval.messages</td>
-      <td>Long.MaxValue</td>
-      <td>The number of messages written to a log partition before we force an fsync on the log. Setting this lower will sync data to disk more often but will have a major impact on performance. We generally recommend that people make use of replication for durability rather than depending on single-server fsync, however this setting can be used to be extra certain.</td>
-    </tr>
-    <tr>
-      <td>log.flush.scheduler.interval.ms</td>
-      <td>Long.MaxValue</td>
-      <td>The frequency in ms that the log flusher checks whether any log is eligible to be flushed to disk.</td>
-    </tr>
-    <tr>
-      <td>log.flush.interval.ms</td>
-      <td>Long.MaxValue</td>
-      <td>The maximum time between fsync calls on the log. If used in conjuction with log.flush.interval.messages the log will be flushed when either criteria is met.</td>
-    </tr>
-    <tr>
-      <td>log.delete.delay.ms</td>
-      <td>60000</td>
-      <td>The period of time we hold log files around after they are removed from the in-memory segment index. This period of time allows any in-progress reads to complete uninterrupted without locking. You generally don't need to change this.</td>
-    </tr>
-    <tr>
-      <td>log.flush.offset.checkpoint.interval.ms</td>
-      <td>60000</td>
-      <td>The frequency with which we checkpoint the last flush point for logs for recovery. You should not need to change this.</td>
-    </tr>
-    <tr>
-      <td>log.segment.delete.delay.ms</td>
-      <td>60000</td>
-      <td>the amount of time to wait before deleting a file from the filesystem.</td>
-    </tr>
-    <tr>
-      <td>auto.create.topics.enable</td>
-      <td>true</td>
-      <td>Enable auto creation of topic on the server.  If this is set to true then attempts to produce data or fetch metadata for a non-existent topic will automatically create it with the default replication factor and number of partitions.</td>
-    </tr>
-    <tr>
-      <td>controller.socket.timeout.ms</td>
-      <td>30000</td>
-      <td>The socket timeout for commands from the partition management controller to the replicas.</td>
-    </tr>
-    <tr>
-      <td>controller.message.queue.size</td>
-      <td>Int.MaxValue</td>
-      <td>The buffer size for controller-to-broker-channels</td>
-    </tr>
-    <tr>
-      <td>default.replication.factor</td>
-      <td>1</td>
-      <td>The default replication factor for automatically created topics.</td>
-    </tr>
-    <tr>
-      <td>replica.lag.time.max.ms</td>
-      <td>10000</td>
-      <td>If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead.</td>
-    </tr>
-    <tr>
-      <td>replica.socket.timeout.ms</td>
-      <td>30 * 1000</td>
-      <td>The socket timeout for network requests to the leader for replicating data.</td>
-    </tr>
-    <tr>
-      <td>replica.socket.receive.buffer.bytes</td>
-      <td>64 * 1024</td>
-      <td>The socket receive buffer for network requests to the leader for replicating data.</td>
-    </tr>
-    <tr>
-      <td>replica.fetch.max.bytes</td>
-      <td nowrap>1024 * 1024</td>
-      <td>The number of byes of messages to attempt to fetch for each partition in the fetch requests the replicas send to the leader.</td>
-    </tr>
-    <tr>
-      <td>replica.fetch.wait.max.ms</td>
-      <td>500</td>
-      <td>The maximum amount of time to wait time for data to arrive on the leader in the fetch requests sent by the replicas to the leader.</td>
-    </tr>
-    <tr>
-      <td>replica.fetch.min.bytes</td>
-      <td>1</td>
-      <td>Minimum bytes expected for each fetch response for the fetch requests from the replica to the leader. If not enough bytes, wait up to replica.fetch.wait.max.ms for this many bytes to arrive.</td>
-    </tr>
-    <tr>
-      <td>num.replica.fetchers</td>
-      <td>1</td>
-      <td>
-        <p>Number of threads used to replicate messages from leaders. Increasing this value can increase the degree of I/O parallelism in the follower broker.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>replica.high.watermark.checkpoint.interval.ms</td>
-      <td>5000</td>
-      <td>The frequency with which each replica saves its high watermark to disk to handle recovery.</td>
-    </tr>
-    <tr>
-      <td>fetch.purgatory.purge.interval.requests</td>
-      <td>1000</td>
-      <td>The purge interval (in number of requests) of the fetch request purgatory.</td>
-    </tr>
-    <tr>
-      <td>producer.purgatory.purge.interval.requests</td>
-      <td>1000</td>
-      <td>The purge interval (in number of requests) of the producer request purgatory.</td>
-    </tr>
-    <tr>
-      <td>zookeeper.session.timeout.ms</td>
-      <td>6000</td>
-      <td>ZooKeeper session timeout. If the server fails to heartbeat to ZooKeeper within this period of time it is considered dead. If you set this too low the server may be falsely considered dead; if you set it too high it may take too long to recognize a truly dead server.</td>
-    </tr>
-    <tr>
-      <td>zookeeper.connection.timeout.ms</td>
-      <td>6000</td>
-      <td>The maximum amount of time that the client waits to establish a connection to zookeeper.</td>
-    </tr>
-    <tr>
-      <td>zookeeper.sync.time.ms</td>
-      <td>2000</td>
-      <td>How far a ZK follower can be behind a ZK leader.</td>
-    </tr>
-    <tr>
-      <td>controlled.shutdown.enable</td>
-      <td>true</td>
-      <td>Enable controlled shutdown of the broker. If enabled, the broker will move all leaders on it to some other brokers before shutting itself down. This reduces the unavailability window during shutdown.</td>
-    </tr>
-    <tr>
-      <td>controlled.shutdown.max.retries</td>
-      <td>3</td>
-      <td>Number of retries to complete the controlled shutdown successfully before executing an unclean shutdown.</td>
-    </tr>
-    <tr>
-      <td>controlled.shutdown.retry.backoff.ms</td>
-      <td>5000</td>
-      <td>Backoff time between shutdown retries.</td>
-    </tr>
-    <tr>
-      <td>auto.leader.rebalance.enable</td>
-      <td>true</td>
-      <td>If this is enabled the controller will automatically try to balance leadership for partitions among the brokers by periodically returning leadership to the "preferred" replica for each partition if it is available.</td>
-    </tr>
-    <tr>
-      <td>leader.imbalance.per.broker.percentage</td>
-      <td>10</td>
-      <td>The percentage of leader imbalance allowed per broker. The controller will rebalance leadership if this ratio goes above
-       the configured value per broker.</td>
-    </tr>
-    <tr>
-      <td>leader.imbalance.check.interval.seconds</td>
-      <td>300</td>
-      <td>The frequency with which to check for leader imbalance.</td>
-    </tr>
-    <tr>
-      <td>offset.metadata.max.bytes</td>
-      <td>4096</td>
-      <td>The maximum amount of metadata to allow clients to save with their offsets.</td>
-    </tr>
-    <tr>
-      <td>max.connections.per.ip</td>
-      <td>Int.MaxValue</td>
-      <td>The maximum number of connections that a broker allows from each ip address.</td>
-    </tr>
-    <tr>
-      <td>max.connections.per.ip.overrides</td>
-      <td></td>
-      <td>Per-ip or hostname overrides to the default maximum number of connections.</td>
-    </tr>
-    <tr>
-      <td>connections.max.idle.ms</td>
-      <td>600000</td>
-      <td>Idle connections timeout: the server socket processor threads close the connections that idle more than this.</td>
-    </tr>
-    <tr>
-      <td>log.roll.jitter.{ms,hours}</td>
-      <td>0</td>
-      <td>The maximum jitter to subtract from logRollTimeMillis.</td>
-    </tr>
-    <tr>
-      <td>num.recovery.threads.per.data.dir</td>
-      <td>1</td>
-      <td>The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.</td>
-    </tr>
-    <tr>
-      <td>unclean.leader.election.enable</td>
-      <td>true</td>
-      <td>Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss.</td>
-    </tr>
-    <tr>
-      <td>delete.topic.enable</td>
-      <td>false</td>
-      <td>Enable delete topic.</td>
-    </tr>
-    <tr>
-      <td>offsets.topic.num.partitions</td>
-      <td>50</td>
-      <td>The number of partitions for the offset commit topic. Since changing this after deployment is currently unsupported, we recommend using a higher setting for production (e.g., 100-200).</td>
-    </tr>
-    <tr>
-      <td>offsets.topic.retention.minutes</td>
-      <td>1440</td>
-      <td>Offsets that are older than this age will be marked for deletion. The actual purge will occur when the log cleaner compacts the offsets topic.</td>
-    </tr>
-    <tr>
-      <td>offsets.retention.check.interval.ms</td>
-      <td>600000</td>
-      <td>The frequency at which the offset manager checks for stale offsets.</td>
-    </tr>
-    <tr>
-      <td>offsets.topic.replication.factor</td>
-      <td>3</td>
-      <td>The replication factor for the offset commit topic. A higher setting (e.g., three or four) is recommended in order to ensure higher availability. If the offsets topic is created when fewer brokers than the replication factor then the offsets topic will be created with fewer replicas.</td>
-    </tr>
-    <tr>
-      <td>offsets.topic.segment.bytes</td>
-      <td>104857600</td>
-      <td>Segment size for the offsets topic. Since it uses a compacted topic, this should be kept relatively low in order to facilitate faster log compaction and loads.</td>
-    </tr>
-    <tr>
-      <td>offsets.load.buffer.size</td>
-      <td>5242880</td>
-      <td>An offset load occurs when a broker becomes the offset manager for a set of consumer groups (i.e., when it becomes a leader for an offsets topic partition). This setting corresponds to the batch size (in bytes) to use when reading from the offsets segments when loading offsets into the offset manager's cache.</td>
-    </tr>
-<!--
-    <tr>
-      <td>offsets.topic.compression.codec</td>
-      <td>none</td>
-      <td>(Should not be used until KAFKA-1374 is implemented.) Compression codec for the offsets topic. Compression should be enabled in order to achieve "atomic" commits.</td>
-    </tr>
--->
-    <tr>
-      <td>offsets.commit.required.acks</td>
-      <td>-1</td>
-      <td>The number of acknowledgements that are required before the offset commit can be accepted. This is similar to the producer's acknowledgement setting. In general, the default should not be overridden.</td>
-    </tr>
-    <tr>
-      <td>offsets.commit.timeout.ms</td>
-      <td>5000</td>
-      <td>The offset commit will be delayed until this timeout or the required number of replicas have received the offset commit. This is similar to the producer request timeout.</td>
-    </tr>
-    <tr>
-      <td>inter.broker.protocol.version</td>
-      <td>0.8.3</td>
-      <td>Version of the protocol brokers will use to communicate with each other. This will default for the current version of the broker, but may need to be set to older versions during a rolling upgrade process. In that scenario, upgraded brokers will use the older version of the protocol and therefore will be able to communicate with brokers that were not yet upgraded. See <a href="#upgrade">upgrade section</a> for more details.</td>
-    </tr>
-</tbody></table>
+<!--#include virtual="kafka_config.html" -->
 
 <p>More details about broker configuration can be found in the scala class <code>kafka.server.KafkaConfig</code>.</p>
 
-<h4><a id="topic-config">Topic-level configuration</a></h3>
+<a id="topic-config">Topic-level configuration</a>
 
 Configurations pertinent to topics have both a global default as well an optional per-topic override. If no per-topic configuration is given the global default is used. The override can be set at topic creation time by giving one or more <code>--config</code> options. This example creates a topic named <i>my-topic</i> with a custom max message size and flush rate:
 <pre>
@@ -509,7 +106,7 @@ The following are the topic-level configurations. The server's default configura
       <td>min.insync.replicas</td>
       <td>1</td>
       <td>min.insync.replicas</td>
-      <td>When a producer sets request.required.acks to -1, min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). </br>
+      <td>When a producer sets request.required.acks to -1, min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
       When used together, min.insync.replicas and request.required.acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with request.required.acks of -1. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.</td>
     </tr>
     <tr>
@@ -550,7 +147,17 @@ The following are the topic-level configurations. The server's default configura
     </tr>
 </table>
 
-<h3><a id="consumerconfigs">3.2 Consumer Configs</a></h3>
+<h3><a id="producerconfigs">3.2 Producer Configs</a></h3>
+
+Below is the configuration of the Java producer:
+<!--#include virtual="producer_config.html" -->
+
+<p>
+    For those interested in the legacy Scala producer configs, information can be found <a href="http://kafka.apache.org/082/documentation.html#producerconfigs">
+    here</a>.
+</p>
+
+<h3><a id="consumerconfigs">3.3 Consumer Configs</a></h3>
 The essential consumer configurations are the following:
 <ul>
         <li><code>group.id</code>
@@ -719,187 +326,10 @@ The essential consumer configurations are the following:
 
 
 <p>More details about consumer configuration can be found in the scala class <code>kafka.consumer.ConsumerConfig</code>.</p>
-<h3><a id="producerconfigs">3.3 Producer Configs</a></h3>
-Essential configuration properties for the producer include:
-<ul>
-        <li><code>metadata.broker.list</code>
-        <li><code>request.required.acks</code>
-        <li><code>producer.type</code>
-        <li><code>serializer.class</code>
-</ul>
 
-<table class="data-table">
-<tbody><tr>
-        <th>Property</th>
-        <th>Default</th>
-        <th>Description</th>
-      </tr>
-    <tr>
-      <td>metadata.broker.list</td>
-      <td colspan="1"></td>
-      <td>
-        <p>This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>request.required.acks</td>
-      <td colspan="1">0</td>
-      <td>
-        <p>This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader? Typical values are
-           <ul>
-             <li>0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
-             <li> 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
-             <li>  -1, The producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the greatest level of durability. However, it does not completely eliminate the risk of message loss because the number of in sync replicas may, in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting. Please read the Replication section of the design documentation for a more in-depth discussion.
-            </ul>
-        </p>
-     </td>
-    </tr>
-    <tr>
-      <td>request.timeout.ms</td>
-      <td colspan="1">10000</td>
-      <td>The amount of time the broker will wait trying to meet the request.required.acks requirement before sending back an error to the client.</td>
-    </tr>
-    <tr>
-      <td>producer.type</td>
-      <td colspan="1">sync</td>
-      <td>
-        <p>This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.</p>
-     </td>
-    <tr>
-      <td>serializer.class</td>
-      <td colspan="1">kafka.serializer.DefaultEncoder</td>
-      <td>The serializer class for messages. The default encoder takes a byte[] and returns the same byte[].</td>
-    </tr>
-    <tr>
-      <td>key.serializer.class</td>
-      <td colspan="1"></td>
-      <td>The serializer class for keys (defaults to the same as for messages if nothing is given).</td>
-    </tr>
-    <tr>
-      <td>partitioner.class</td>
-      <td colspan="1">kafka.producer.DefaultPartitioner</td>
-      <td>The partitioner class for partitioning messages amongst sub-topics. The default partitioner is based on the hash of the key.</td>
-    </tr>
-    <tr>
-      <td>compression.codec</td>
-      <td colspan="1">none</td>
-      <td>
-        <p>This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are "none", "gzip" and "snappy".</p>
-     </td>
-    </tr>
-    <tr>
-      <td>compressed.topics</td>
-      <td colspan="1">null</td>
-      <td>
-        <p>This parameter allows you to set whether compression should be turned on for particular topics. If the compression codec is anything other than NoCompressionCodec, enable compression only for specified topics if any. If the list of compressed topics is empty, then enable the specified compression codec for all topics. If the compression codec is NoCompressionCodec, compression is disabled for all topics</p>
-     </td>
-    </tr>
-    <tr>
-      <td>message.send.max.retries</td>
-      <td colspan="1">3</td>
-      <td>
-        <p>This property will cause the producer to automatically retry a failed send request. This property specifies the number of retries when such failures occur. Note that setting a non-zero value here can lead to duplicates in the case of network errors that cause a message to be sent but the acknowledgement to be lost.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>retry.backoff.ms</td>
-      <td colspan="1">100</td>
-      <td>
-        <p>Before each retry, the producer refreshes the metadata of relevant topics to see if a new leader has been elected. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>topic.metadata.refresh.interval.ms</td>
-      <td colspan="1">600 * 1000</td>
-      <td>
-        <p>The producer generally refreshes the topic metadata from brokers when there is a failure (partition missing, leader not available...). It will also poll regularly (default: every 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on failure. If you set this to zero, the metadata will get refreshed after each message sent (not recommended). Important note: the refresh happen only AFTER the message is sent, so if the producer never sends a message the metadata is never refreshed</p>
-     </td>
-    </tr>
-    <tr>
-      <td>queue.buffering.max.ms</td>
-      <td colspan="1">5000</td>
-      <td>Maximum time to buffer data when using async mode. For example a setting of 100 will try to batch together 100ms of messages to send at once. This will improve throughput but adds message delivery latency due to the buffering.</td>
-    </tr>
-    <tr>
-      <td>queue.buffering.max.messages</td>
-      <td colspan="1">10000</td>
-      <td>The maximum number of unsent messages that can be queued up the producer when using async mode before either the producer must be blocked or data must be dropped.</td>
-    </tr>
-    <tr>
-      <td>queue.enqueue.timeout.ms</td>
-      <td colspan="1">-1</td>
-      <td>
-        <p>The amount of time to block before dropping messages when running in async mode and the buffer has reached queue.buffering.max.messages. If set to 0 events will be enqueued immediately or dropped if the queue is full (the producer send call will never block). If set to -1 the producer will block indefinitely and never willingly drop a send.</p>
-     </td>
-    </tr>
-    <tr>
-      <td>batch.num.messages</td>
-      <td colspan="1">200</td>
-      <td>The number of messages to send in one batch when using async mode. The producer will wait until either this number of messages are ready to send or queue.buffer.max.ms is reached.</td>
-    </tr>
-    <tr>
-      <td>send.buffer.bytes</td>
-      <td colspan="1">100 * 1024</td>
-      <td>Socket write buffer size</td>
-    </tr>
-    <tr>
-      <td>client.id</td>
-      <td colspan="1">""</td>
-      <td>The client id is a user-specified string sent in each request to help trace calls. It should logically identify the application making the request.</td>
-    </tr>
-</tbody></table>
-<p>More details about producer configuration can be found in the scala class <code>kafka.producer.ProducerConfig</code>.</p>
-
-<h3><a id="newproducerconfigs">3.4 New Producer Configs</a></h3>
+<h3><a id="newconsumerconfigs">3.4 New Consumer Configs</a></h3>
+Since 0.9.0.0 we have been working on a replacement for our existing simple and high-level consumers. The code can be considered beta quality. Below is the configuration for the new consumer:
+<!--#include virtual="consumer_config.html" -->
 
-We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality. Below is the configuration for the new producer.
-
-<table class="data-table">
-	<tr>
-	<th>Name</th>
-	<th>Type</th>
-	<th>Default</th>
-	<th>Importance</th>
-	<th>Description</th>
-	</tr>
-	<tr>
-	<td>bootstrap.servers</td><td>list</td><td></td><td>high</td><td>A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Data will be load balanced over all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form <code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). If no server in this list is available sending data will fail until on becomes available.</td></tr>
-	<tr>
-	<td>acks</td><td>string</td><td>1</td><td>high</td><td>The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the  durability of records that are sent. The following settings are common:  <ul> <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the <code>retries</code> configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers
  have replicated it then the record will be lost. <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. <li>Other settings such as <code>acks=2</code> are also possible, and will require the given number of acknowledgements but this is generally less useful.</td></tr>
-	<tr>
-	<td>buffer.memory</td><td>long</td><td>33554432</td><td>high</td><td>The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by <code>block.on.buffer.full</code>. <p>This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.</td></tr>
-	<tr>
-	<td>compression.type</td><td>string</td><td>none</td><td>high</td><td>The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid  values are <code>none</code>, <code>gzip</code>, or <code>snappy</code>. Compression is of full batches of data,  so the efficacy of batching will also impact the compression ratio (more batching means better compression).</td></tr>
-	<tr>
-	<td>retries</td><td>int</td><td>0</td><td>high</td><td>Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.</td></tr>
-	<tr>
-	<td>batch.size</td><td>int</td><td>16384</td><td>medium</td><td>The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. <p>No attempt will be made to batch records larger than this size. <p>Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. <p>A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.</td></tr>
-	<tr>
-	<td>client.id</td><td>string</td><td></td><td>medium</td><td>The id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included with the request. The application can set any string it wants as this has no functional purpose other than in logging and metrics.</td></tr>
-	<tr>
-	<td>linger.ms</td><td>long</td><td>0</td><td>medium</td><td>The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay&mdash;that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get <code>batch.size</code> worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the spe
 cified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting <code>linger.ms=5</code>, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.</td></tr>
-	<tr>
-	<td>max.request.size</td><td>int</td><td>1048576</td><td>medium</td><td>The maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.</td></tr>
-	<tr>
-	<td>receive.buffer.bytes</td><td>int</td><td>32768</td><td>medium</td><td>The size of the TCP receive buffer to use when reading data</td></tr>
-	<tr>
-	<td>send.buffer.bytes</td><td>int</td><td>131072</td><td>medium</td><td>The size of the TCP send buffer to use when sending data</td></tr>
-	<tr>
-	<td>timeout.ms</td><td>int</td><td>30000</td><td>medium</td><td>The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the <code>acks</code> configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.</td></tr>
-	<tr>
-	<td>block.on.buffer.full</td><td>boolean</td><td>true</td><td>low</td><td>When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default this setting is true and we block, however in some scenarios blocking is not desirable and it is better to immediately give an error. Setting this to <code>false</code> will accomplish that: the producer will throw a BufferExhaustedException if a recrord is sent and the buffer space is full.</td></tr>
-	<tr>
-	<td>metadata.fetch.timeout.ms</td><td>long</td><td>60000</td><td>low</td><td>The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This configuration controls the maximum amount of time we will block waiting for the metadata fetch to succeed before throwing an exception back to the client.</td></tr>
-	<tr>
-	<td>metadata.max.age.ms</td><td>long</td><td>300000</td><td>low</td><td>The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any  partition leadership changes to proactively discover any new brokers or partitions.</td></tr>
-	<tr>
-	<td>metric.reporters</td><td>list</td><td>[]</td><td>low</td><td>A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.</td></tr>
-	<tr>
-	<td>metrics.num.samples</td><td>int</td><td>2</td><td>low</td><td>The number of samples maintained to compute metrics.</td></tr>
-	<tr>
-	<td>metrics.sample.window.ms</td><td>long</td><td>30000</td><td>low</td><td>The metrics system maintains a configurable number of samples over a fixed window size. This configuration controls the size of the window. For example we might maintain two samples each measured over a 30 second period. When a window expires we erase and overwrite the oldest window.</td></tr>
-	<tr>
-	<td>reconnect.backoff.ms</td><td>long</td><td>10</td><td>low</td><td>The amount of time to wait before attempting to reconnect to a given host when a connection fails. This avoids a scenario where the client repeatedly attempts to connect to a host in a tight loop.</td></tr>
-	<tr>
-	<td>retry.backoff.ms</td><td>long</td><td>100</td><td>low</td><td>The amount of time to wait before attempting to retry a failed produce request to a given topic partition. This avoids repeated sending-and-failing in a tight loop.</td></tr>
-	</table>
+<h3><a id="connectconfigs">3.5 Kafka Connect Configs</a></h3>
+<!--#include virtual="connect_config.html" -->
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka-site/blob/8c4a140c/090/connect.html
----------------------------------------------------------------------
diff --git a/090/connect.html b/090/connect.html
new file mode 100644
index 0000000..8791ab0
--- /dev/null
+++ b/090/connect.html
@@ -0,0 +1,328 @@
+<!--~
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~-->
+
+<h3><a id="connect_overview">8.1 Overview</a></h3>
+
+Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define <i>connectors</i> that move large collections of data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export job can deliver data from Kafka topics into secondary storage and query systems or into batch systems for offline analysis.
+
+Kafka Connect features include:
+<ul>
+    <li><b>A common framework for Kafka connectors</b> - Kafka Connect standardizes integration of other data systems with Kafka, simplifying connector development, deployment, and management</li>
+    <li><b>Distributed and standalone modes</b> - scale up to a large, centrally managed service supporting an entire organization or scale down to development, testing, and small production deployments</li>
+    <li><b>REST interface</b> - submit and manage connectors to your Kafka Connect cluster via an easy to use REST API</li>
+    <li><b>Automatic offset management</b> - with just a little information from connectors, Kafka Connect can manage the offset commit process automatically so connector developers do not need to worry about this error prone part of connector development</li>
+    <li><b>Distributed and scalable by default</b> - Kafka Connect builds on the existing </li>
+    <li><b>Streaming/batch integration</b> - leveraging Kafka's existing capabilities, Kafka Connect is an ideal solution for bridging streaming and batch data systems</li>
+</ul>
+
+<h3><a id="connect_user">8.2 User Guide</a></h3>
+
+The quickstart provides a brief example of how to run a standalone version of Kafka Connect. This section describes how to configure, run, and manage Kafka Connect in more detail.
+
+<h4>Running Kafka Connect</h4>
+
+Kafka Connect currently supports two modes of execution: standalone (single process) and distributed.
+
+In standalone mode all work is performed in a single process. This configuration is simpler to setup and get started with and may be useful in situations where only one worker makes sense (e.g. collecting log files), but it does not benefit from some of the features of Kafka Connect such as fault tolerance. You can start a standalone process with the following command:
+
+<pre>
+&gt; bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
+</pre>
+
+The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by <code>config/server.properties</code>. It will require tweaking to use with a different configuration or production deployment.
+
+The remaining parameters are connector configuration files. You may include as many as you want, but all will execute within the same process (on different threads).
+
+Distributed mode handles automatic balancing of work, allows you to scale up (or down) dynamically, and offers fault tolerance both in the active tasks and for configuration and offset commit data. Execution is very similar to standalone mode:
+
+<pre>
+&gt; bin/connect-distributed.sh config/connect-distributed.properties
+</pre>
+
+The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets. In particular, the following configuration parameters are critical to set before starting your cluster:
+
+<ul>
+    <li><code>group.id</code> (default <code>connect-cluster</code>) - unique name for the cluster, used in forming the Connect cluster group; note that this <b>must not conflict</b> with consumer group IDs</li>
+    <li><code>config.storage.topic</code> (default <code>connect-configs</code>) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic</li>
+    <li><code>offset.storage.topic</code> (default <code>connect-offsets</code>) - topic to use for ; this topic should have many partitions and be replicated</li>
+</ul>
+
+Note that in distributed mode the connector configurations are not passed on the command line. Instead, use the REST API described below to create, modify, and destroy connectors.
+
+
+<h4>Configuring Connectors</h4>
+
+Connector configurations are simple key-value mappings. For standalone mode these are defined in a properties file and passed to the Connect process on the command line. In distributed mode, they will be included in the JSON payload for the request that creates (or modifies) the connector.
+
+Most configurations are connector dependent, so they can't be outlined here. However, there are a few common options:
+
+<ul>
+    <li><code>name</code> - Unique name for the connector. Attempting to register again with the same name will fail.</li>
+    <li><code>connector.class</code> - The Java class for the connector</li>
+    <li><code>tasks.max</code> - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.</li>
+</ul>
+
+Sink connectors also have one additional option to control their input:
+<ul>
+    <li><code>topics</code> - A list of topics to use as input for this connector</li>
+</ul>
+
+For any other options, you should consult the documentation for the connector.
+
+
+<h4>REST API</h4>
+
+Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default this service runs on port 8083. The following are the currently supported endpoints:
+
+<ul>
+    <li><code>GET /connectors</code> - return a list of active connectors</li>
+    <li><code>POST /connectors</code> - create a new connector; the request body should be a JSON object containing a string <code>name</code> field and a object <code>config</code> field with the connector configuration parameters</li>
+    <li><code>GET /connectors/{name}</code> - get information about a specific connector</li>
+    <li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li>
+    <li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li>
+    <li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector</li>
+    <li><code>DELETE /connectors/{name}</code> - delete a connector, halting all tasks and deleting its configuration</li>
+</ul>
+
+<h3><a id="connect_development">8.3 Connector Development Guide</a></h3>
+
+This guide describes how developers can write new connectors for Kafka Connect to move data between Kafka and other systems. It briefly reviews a few key concepts and then describes how to create a simple connector.
+
+<h4>Core Concepts and APIs</h4>
+
+<h5>Connectors and Tasks</h5>
+
+To copy data between Kafka and another system, users create a <code>Connector</code> for the system they want to pull data from or push data to. Connectors come in two flavors: <code>SourceConnectors</code> import data from another system (e.g. <code>JDBCSourceConnector</code> would import a relational database into Kafka) and <code>SinkConnectors</code> export data (e.g. <code>HDFSSinkConnector</code> would export the contents of a Kafka topic to an HDFS file).
+
+<code>Connectors</code> do not perform any data copying themselves: their configuration describes the data to be copied, and the <code>Connector</code> is responsible for breaking that job into a set of <code>Tasks</code> that can be distributed to workers. These <code>Tasks</code> also come in two corresponding flavors: <code>SourceTask</code>and <code>SinkTask</code>.
+
+With an assignment in hand, each <code>Task</code> must copy its subset of the data to or from Kafka. In Kafka Connect, it should always be possible to frame these assignments as a set of input and output streams consisting of records with consistent schemas. Sometimes this mapping is obvious: each file in a set of log files can be considered a stream with each parsed line forming a record using the same schema and offsets stored as byte offsets in the file. In other cases it may require more effort to map to this model: a JDBC connector can map each table to a stream, but the offset is less clear. One possible mapping uses a timestamp column to generate queries incrementally returning new data, and the last queried timestamp can be used as the offset.
+
+
+<h5>Streams and Records</h5>
+
+Each stream should be a sequence of key-value records. Both the keys and values can have complex structure -- many primitive types are provided, but arrays, objects, and nested data structures can be represented as well. The runtime data format does not assume any particular serialization format; this conversion is handled internally by the framework.
+
+In addition to the key and value, records (both those generated by sources and those delivered to sinks) have associated stream IDs and offsets. These are used by the framework to periodically commit the offsets of data that have been processed so that in the event of failures, processing can resume from the last committed offsets, avoiding unnecessary reprocessing and duplication of events.
+
+<h5>Dynamic Connectors</h5>
+
+Not all jobs are static, so <code>Connector</code> implementations are also responsible for monitoring the external system for any changes that might require reconfiguration. For example, in the <code>JDBCSourceConnector</code> example, the <code>Connector</code> might assign a set of tables to each <code>Task</code>. When a new table is created, it must discover this so it can assign the new table to one of the <code>Tasks</code> by updating its configuration. When it notices a change that requires reconfiguration (or a change in the number of <code>Tasks</code>), it notifies the framework and the framework updates anycorresponding <code>Tasks</code>.
+
+
+<h4>Developing a Simple Connector</h4>
+
+Developing a connector only requires implementing two interfaces, the <code>Connector</code> and <code>Task</code>. A simple example is included with the source code for Kafka in the <code>file</code> package. This connector is meant for use in standalone mode and has implementations of a <code>SourceConnector</code>/<code>SourceTask</code> to read each line of a file and emit it as a record and a <code>SinkConnector</code>/<code>SinkTask</code> that writes each record to a file.
+
+The rest of this section will walk through some code to demonstrate the key steps in creating a connector, but developers should also refer to the full example source code as many details are omitted for brevity.
+
+<h5>Connector Example</h5>
+
+We'll cover the <code>SourceConnector</code> as a simple example. <code>SinkConnector</code> implementations are very similar. Start by creating the class that inherits from <code>SourceConnector</code> and add a couple of fields that will store parsed configuration information (the filename to read from and the topic to send data to):
+
+<pre>
+public class FileStreamSourceConnector extends SourceConnector {
+    private String filename;
+    private String topic;
+</pre>
+
+The easiest method to fill in is <code>getTaskClass()</code>, which defines the class that should be instantiated in worker processes to actually read the data:
+
+<pre>
+@Override
+public Class<? extends Task> getTaskClass() {
+    return FileStreamSourceTask.class;
+}
+</pre>
+
+We will define the <code>FileStreamSourceTask</code> class below. Next, we add some standard lifecycle methods, <code>start()</code> and <code>stop()</code>:
+
+<pre>
+@Override
+public void start(Map&lt;String, String&gt; props) {
+    // The complete version includes error handling as well.
+    filename = props.get(FILE_CONFIG);
+    topic = props.get(TOPIC_CONFIG);
+}
+
+@Override
+public void stop() {
+    // Nothing to do since no background monitoring is required.
+}
+</pre>
+
+Finally, the real core of the implementation is in <code>getTaskConfigs()</code>. In this case we're only
+handling a single file, so even though we may be permitted to generate more tasks as per the
+<code>maxTasks</code> argument, we return a list with only one entry:
+
+<pre>
+@Override
+public List&lt;Map&lt;String, String&gt;&gt; getTaskConfigs(int maxTasks) {
+    ArrayList&gt;Map&lt;String, String&gt;&gt; configs = new ArrayList&lt;&gt;();
+    // Only one input stream makes sense.
+    Map&lt;String, String&gt; config = new Map&lt;&gt;();
+    if (filename != null)
+        config.put(FILE_CONFIG, filename);
+    config.put(TOPIC_CONFIG, topic);
+    configs.add(config);
+    return configs;
+}
+</pre>
+
+Even with multiple tasks, this method implementation is usually pretty simple. It just has to determine the number of input tasks, which may require contacting the remote service it is pulling data from, and then divvy them up. Because some patterns for splitting work among tasks are so common, some utilities are provided in <code>ConnectorUtils</code> to simplify these cases.
+
+Note that this simple example does not include dynamic input. See the discussion in the next section for how to trigger updates to task configs.
+
+<h5>Task Example - Source Task</h5>
+
+Next we'll describe the implementation of the corresponding <code>SourceTask</code>. The implementation is short, but too long to cover completely in this guide. We'll use pseudo-code to describe most of the implementation, but you can refer to the source code for the full example.
+
+Just as with the connector, we need to create a class inheriting from the appropriate base <code>Task</code> class. It also has some standard lifecycle methods:
+
+
+<pre>
+public class FileStreamSourceTask extends SourceTask&lt;Object, Object&gt; {
+    String filename;
+    InputStream stream;
+    String topic;
+
+    public void start(Map&lt;String, String&gt; props) {
+        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
+        stream = openOrThrowError(filename);
+        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
+    }
+
+    @Override
+    public synchronized void stop() {
+        stream.close()
+    }
+</pre>
+
+These are slightly simplified versions, but show that that these methods should be relatively simple and the only work they should perform is allocating or freeing resources. There are two points to note about this implementation. First, the <code>start()</code> method does not yet handle resuming from a previous offset, which will be addressed in a later section. Second, the <code>stop()</code> method is synchronized. This will be necessary because <code>SourceTasks</code> are given a dedicated thread which they can block indefinitely, so they need to be stopped with a call from a different thread in the Worker.
+
+Next, we implement the main functionality of the task, the <code>poll()</code> method which gets events from the input system and returns a <code>List&lt;SourceRecord&gt;</code>:
+
+<pre>
+@Override
+public List&lt;SourceRecord&gt; poll() throws InterruptedException {
+    try {
+        ArrayList&lt;SourceRecord&gt; records = new ArrayList&lt;&gt;();
+        while (streamValid(stream) && records.isEmpty()) {
+            LineAndOffset line = readToNextLine(stream);
+            if (line != null) {
+                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
+                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
+                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
+            } else {
+                Thread.sleep(1);
+            }
+        }
+        return records;
+    } catch (IOException e) {
+        // Underlying stream was killed, probably as a result of calling stop. Allow to return
+        // null, and driving thread will handle any shutdown if necessary.
+    }
+    return null;
+}
+</pre>
+
+Again, we've omitted some details, but we can see the important steps: the <code>poll()</code> method is going to be called repeatedly, and for each call it will loop trying to read records from the file. For each line it reads, it also tracks the file offset. It uses this information to create an output <code>SourceRecord</code> with four pieces of information: the source partition (there is only one, the single file being read), source offset (byte offset in the file), output topic name, and output value (the line, and we include a schema indicating this value will always be a string). Other variants of the <code>SourceRecord</code> constructor can also inclue a specific output partition and a key.
+
+Note that this implementation uses the normal Java <code>InputStream</code>interface and may sleep if data is not avaiable. This is acceptable because Kafka Connect provides each task with a dedicated thread. While task implementations have to conform to the basic <code>poll()</code>interface, they have a lot of flexibility in how they are implemented. In this case, an NIO-based implementation would be more efficient, but this simple approach works, is quick to implement, and is compatible with older versions of Java.
+
+<h5>Sink Tasks</h5>
+
+The previous section described how to implement a simple <code>SourceTask</code>. Unlike <code>SourceConnector</code>and <code>SinkConnector</code>, <code>SourceTask</code>and <code>SinkTask</code>have very different interfaces because <code>SourceTask</code>uses a pull interface and <code>SinkTask</code>uses a push interface. Both share the common lifecycle methods, but the <code>SinkTask</code>interface is quite different:
+
+<pre>
+public abstract class SinkTask implements Task {
+public void initialize(SinkTaskContext context) { ... }
+
+public abstract void put(Collection&lt;SinkRecord&gt; records);
+
+public abstract void flush(Map&lt;TopicPartition, Long&gt; offsets);
+</pre>
+
+The <code>SinkTask</code> documentation contains full details, but this interface is nearly as simple as the the <code>SourceTask</code>. The <code>put()</code>method should contain most of the implementation, accepting sets of <code>SinkRecords</code>, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering will be useful so an entire batch of records can be sent at once, reducing the overhead of inserting events into the downstream data store. The <code>SinkRecords</code>contain essentially the same information as <code>SourceRecords</code>: Kafka topic, partition, offset and the event key and value.
+
+The <code>flush()</code>method is used during the offset commit process, which allows tasks to recover from failures and resume from a safe point such that no events will be missed. The method should push any outstanding data to the destination system and then block until the write has been acknowledged. The <code>offsets</code>parameter can often be ignored, but is useful in some cases where implementations want to store offset information in the destination store to provide exactly-once
+delivery. For example, an HDFS connector could do this and use atomic move operations to make sure the <code>flush()</code>operation atomically commits the data and offsets to a final location in HDFS.
+
+
+<h5>Resuming from Previous Offsets</h5>
+
+The <code>SourceTask</code>implementation included a stream ID (the input filename) and offset (position in the file) with each record. The framework uses this to commit offsets periodically so that in the case of a failure, the task can recover and minimize the number of events that are reprocessed and possibly duplicated (or to resume from the most recent offset if Kafka Connect was stopped gracefully, e.g. in standalone mode or due to a job reconfiguration). This commit process is completely automated by the framework, but only the connector knows how to seek back to the right position in the input stream to resume from that location.
+
+To correctly resume upon startup, the task can use the <code>SourceContext</code>passed into its <code>initialize()</code>method to access the offset data. In <code>initialize()</code>, we would add a bit more code to read the offset (if it exists) and seek to that position:
+
+<pre>
+    stream = new FileInputStream(filename);
+    Map&lt;String, Object&gt; offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
+    if (offset != null) {
+        Long lastRecordedOffset = (Long) offset.get("position");
+        if (lastRecordedOffset != null)
+            seekToOffset(stream, lastRecordedOffset);
+    }
+</pre>
+
+Of course, you might need to read many keys for each of the input streams. The <code>OffsetStorageReader</code> interface also allows you to issue bulk reads to efficiently load all offsets, then apply them by seeking each input stream to the appropriate position.
+
+<h4>Dynamic Input/Output Streams</h4>
+
+Kafka Connect is intended to define bulk data copying jobs, such as copying an entire database rather than creating many jobs to copy each table individually. One consequence of this design is that the set of input or output streams for a connector can vary over time.
+
+Source connectors need to monitor the source system for changes, e.g. table additions/deletions in a database. When they pick up changes, they should notify the framework via the <code>ConnectorContext</code>object that reconfiguration is necessary. For example, in a <code>SourceConnector</code>:
+
+
+<pre>
+if (inputsChanged())
+    this.context.requestTaskReconfiguration();
+</pre>
+
+The framework will promptly request new configuration information and update the tasks, allowing them to gracefully commit their progress before reconfiguring them. Note that in the <code>SourceConnector</code>this monitoring is currently left up to the connector implementation. If an extra thread is required to perform this monitoring, the connector must allocate it itself.
+
+Ideally this code for monitoring changes would be isolated to the <code>Connector</code>and tasks would not need to worry about them. However, changes can also affect tasks, most commonly when one of their input streams is destroyed in the input system, e.g. if a table is dropped from a database. If the <code>Task</code>encounters the issue before the <code>Connector</code>, which will be common if the <code>Connector</code>needs to poll for changes, the <code>Task</code>will need to handle the subsequent error. Thankfully, this can usually be handled simply by catching and handling the appropriate exception.
+
+<code>SinkConnectors</code> usually only have to handle the addition of streams, which may translate to new entries in their outputs (e.g., a new database table). The framework manages any changes to the Kafka input, such as when the set of input topics changes because of a regex subscription. <code>SinkTasks</code>should expect new input streams, which may require creating new resources in the downstream system, such as a new table in a database. The trickiest situation to handle in these cases may be conflicts between multiple <code>SinkTasks</code>seeing a new input stream for the first time and simultaneoulsy trying to create the new resource. <code>SinkConnectors</code>, on the other hand, will generally require no special code for handling a dynamic set of streams.
+
+<h4>Working with Schemas</h4>
+
+The FileStream connectors are good examples because they are simple, but they also have trivially structured data -- each line is just a string. Almost all practical connectors will need schemas with more complex data formats.
+
+To create more complex data, you'll need to work with the Kafka Connect <code>data</code> API. Most structured records will need to interact with two classes in addition to primitive types: <code>Schema</code> and <code>Struct</code>.
+
+The API documentation provides a complete reference, but here is a simple example creating a <code>Schema</code>and <code>Struct</code>:
+
+<pre>
+Schema schema = SchemaBuilder.struct().name(NAME)
+                    .field("name", Schema.STRING_SCHEMA)
+                    .field("age", Schema.INT_SCHEMA)
+                    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
+                    .build();
+
+Struct struct = new Struct(schema)
+                           .put("name", "Barbara Liskov")
+                           .put("age", 75)
+                           .build();
+</pre>
+
+If you are implementing a source connector, you'll need to decide when and how to create schemas. Where possible, you should avoid recomputing them as much as possible. For example, if your connector is guaranteed to have a fixed schema, create it statically and reuse a single instance.
+
+However, many connectors will have dynamic schemas. One simple example of this is a database connector. Considering even just a single table, the schema will not be predefined for the entire connector (as it varies from table to table). But it also may not be fixed for a single table over the lifetime of the connector since the user may execute an <code>ALTER TABLE</code>command. The connector must be able to detect these changes and react appropriately.
+
+Sink connectors are usually simpler because they are consuming data and therefore do not need to create schemas. However, they should take just as much care to validate that the schemas they receive have the expected format. When the schema does not match -- usually indicating the upstream producer is generating invalid data that cannot be correctly translated to the destination system -- sink connectors should throw an exception to indicate this error to the system.
+