You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/02/04 05:58:45 UTC

[6/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.

KAFKA-1915: Add checkstyle for java code.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c6d5bba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c6d5bba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c6d5bba

Branch: refs/heads/trunk
Commit: 1c6d5bbac672cbf49591aed0889510b10e3285fa
Parents: f1ba4ff
Author: Jay Kreps <ja...@gmail.com>
Authored: Mon Feb 2 21:36:21 2015 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Feb 3 09:16:55 2015 -0800

----------------------------------------------------------------------
 README.md                                       |    3 +
 build.gradle                                    |    6 +
 checkstyle/checkstyle.xml                       |   83 +
 checkstyle/import-control.xml                   |  100 +
 .../org/apache/kafka/clients/ClientUtils.java   |   48 +
 .../kafka/clients/ClusterConnectionStates.java  |    9 +-
 .../java/org/apache/kafka/clients/Metadata.java |  170 ++
 .../org/apache/kafka/clients/NetworkClient.java |    3 +-
 .../kafka/clients/consumer/CommitType.java      |   12 +
 .../kafka/clients/consumer/ConsumerConfig.java  |    8 +-
 .../kafka/clients/consumer/ConsumerRecords.java |   40 +-
 .../kafka/clients/consumer/KafkaConsumer.java   |   43 +-
 .../clients/consumer/internals/Heartbeat.java   |   12 +
 .../NoOpConsumerRebalanceCallback.java          |    4 +-
 .../consumer/internals/SubscriptionState.java   |   16 +-
 .../kafka/clients/producer/KafkaProducer.java   |   25 +-
 .../apache/kafka/clients/producer/Producer.java |    6 +-
 .../kafka/clients/producer/ProducerConfig.java  |   10 +-
 .../clients/producer/internals/BufferPool.java  |    8 +-
 .../clients/producer/internals/Metadata.java    |  170 --
 .../internals/ProduceRequestResult.java         |    1 -
 .../producer/internals/RecordAccumulator.java   |   40 +-
 .../clients/producer/internals/Sender.java      |    1 +
 .../clients/tools/ProducerPerformance.java      |    8 +-
 .../java/org/apache/kafka/common/Cluster.java   |    4 +-
 .../org/apache/kafka/common/MetricName.java     |    4 +-
 .../org/apache/kafka/common/PartitionInfo.java  |    2 +-
 .../apache/kafka/common/config/ConfigDef.java   |   82 +-
 .../NotEnoughReplicasAfterAppendException.java  |   33 +-
 .../errors/NotEnoughReplicasException.java      |   28 +-
 .../message/KafkaLZ4BlockInputStream.java       |  233 --
 .../message/KafkaLZ4BlockOutputStream.java      |  387 ---
 .../kafka/common/metrics/JmxReporter.java       |   57 +-
 .../org/apache/kafka/common/metrics/Sensor.java |   12 +-
 .../apache/kafka/common/metrics/stats/Rate.java |    2 +-
 .../kafka/common/network/NetworkReceive.java    |    2 +-
 .../apache/kafka/common/network/Selector.java   |    9 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |   12 +-
 .../apache/kafka/common/protocol/Protocol.java  |  590 +++--
 .../kafka/common/protocol/types/Struct.java     |    2 +-
 .../common/record/ByteBufferOutputStream.java   |    2 +-
 .../apache/kafka/common/record/Compressor.java  |   29 +-
 .../common/record/KafkaLZ4BlockInputStream.java |  234 ++
 .../record/KafkaLZ4BlockOutputStream.java       |  392 +++
 .../kafka/common/record/MemoryRecords.java      |   36 +-
 .../requests/ConsumerMetadataRequest.java       |    2 +-
 .../requests/ConsumerMetadataResponse.java      |    2 +-
 .../kafka/common/requests/FetchRequest.java     |    2 +-
 .../kafka/common/requests/FetchResponse.java    |    2 +-
 .../kafka/common/requests/HeartbeatRequest.java |    2 +-
 .../common/requests/HeartbeatResponse.java      |    2 +-
 .../kafka/common/requests/JoinGroupRequest.java |    2 +-
 .../common/requests/JoinGroupResponse.java      |    2 +-
 .../common/requests/ListOffsetRequest.java      |    4 +-
 .../common/requests/ListOffsetResponse.java     |    2 +-
 .../kafka/common/requests/MetadataRequest.java  |    2 +-
 .../kafka/common/requests/MetadataResponse.java |    8 +-
 .../common/requests/OffsetCommitRequest.java    |    4 +-
 .../common/requests/OffsetCommitResponse.java   |    2 +-
 .../common/requests/OffsetFetchRequest.java     |    4 +-
 .../common/requests/OffsetFetchResponse.java    |    2 +-
 .../kafka/common/requests/ProduceRequest.java   |    2 +-
 .../kafka/common/requests/ProduceResponse.java  |    2 +-
 .../kafka/common/requests/RequestHeader.java    |    8 +-
 .../kafka/common/requests/ResponseHeader.java   |    4 +-
 .../apache/kafka/common/utils/ClientUtils.java  |   49 -
 .../org/apache/kafka/common/utils/Crc32.java    | 2338 ++----------------
 .../org/apache/kafka/common/utils/Utils.java    |   44 +-
 .../apache/kafka/clients/ClientUtilsTest.java   |   42 +
 .../org/apache/kafka/clients/MockClient.java    |   18 +-
 .../apache/kafka/clients/NetworkClientTest.java |   17 +-
 .../clients/consumer/MockConsumerTest.java      |   16 +
 .../internals/SubscriptionStateTest.java        |   16 +
 .../kafka/clients/producer/BufferPoolTest.java  |   14 +-
 .../kafka/clients/producer/MetadataTest.java    |    8 +-
 .../clients/producer/MockProducerTest.java      |    1 +
 .../kafka/clients/producer/PartitionerTest.java |    2 +-
 .../clients/producer/RecordAccumulatorTest.java |    3 +-
 .../kafka/clients/producer/RecordSendTest.java  |    5 +-
 .../kafka/clients/producer/SenderTest.java      |    6 +-
 .../kafka/common/config/AbstractConfigTest.java |  106 +-
 .../kafka/common/config/ConfigDefTest.java      |    5 +-
 .../common/metrics/FakeMetricsReporter.java     |   32 +
 .../kafka/common/metrics/MetricsTest.java       |    4 +-
 .../common/metrics/stats/HistogramTest.java     |    1 -
 .../kafka/common/network/SelectorTest.java      |    1 -
 .../types/ProtocolSerializationTest.java        |   14 +-
 .../kafka/common/record/MemoryRecordsTest.java  |    2 +-
 .../apache/kafka/common/record/RecordTest.java  |    4 +-
 .../common/requests/RequestResponseTest.java    |   23 +-
 .../common/serialization/SerializationTest.java |    4 +-
 .../kafka/common/utils/ClientUtilsTest.java     |   42 -
 .../org/apache/kafka/common/utils/CrcTest.java  |    8 +-
 .../org/apache/kafka/test/Microbenchmarks.java  |    1 -
 .../java/org/apache/kafka/test/TestUtils.java   |   16 +-
 .../consumer/ConsumerRebalanceListener.java     |    3 -
 .../kafka/message/CompressionFactory.scala      |    2 +-
 .../scala/kafka/tools/KafkaMigrationTool.java   |   12 +-
 core/src/main/scala/kafka/utils/Crc32.java      |   40 +-
 .../java/kafka/examples/SimpleConsumerDemo.java |    9 +-
 100 files changed, 2225 insertions(+), 3721 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 9bdcf70..784daaf 100644
--- a/README.md
+++ b/README.md
@@ -98,6 +98,9 @@ Please note for this to work you should create/update `~/.gradle/gradle.properti
 
 ### Determining how transitive dependencies are added ###
     ./gradlew core:dependencies --configuration runtime
+	
+### Running checkstyle on the java code ###
+    ./gradlew checkstyleMain checkstyleTest
 
 ### Running in Vagrant ###
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6844372..0f0fe60 100644
--- a/build.gradle
+++ b/build.gradle
@@ -345,6 +345,7 @@ project(':examples') {
 }
 
 project(':clients') {
+  apply plugin: 'checkstyle'
   archivesBaseName = "kafka-clients"
 
   dependencies {
@@ -379,4 +380,9 @@ project(':clients') {
   artifacts {
     archives testJar
   }
+  
+  checkstyle {
+     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
+  }
+  test.dependsOn('checkstyleMain', 'checkstyleTest')
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
new file mode 100644
index 0000000..a215ff3
--- /dev/null
+++ b/checkstyle/checkstyle.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN" 
+     "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+// 
+//    http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+--> 
+<module name="Checker">
+  <property name="localeLanguage" value="en"/>
+  
+  <module name="FileTabCharacter"/>
+  
+  <!-- header -->
+  <module name="RegexpHeader">
+    <property name="header" value="/\*\*\nLicensed to the Apache.*"/>
+  </module>
+  
+  <module name="TreeWalker">
+    
+    <!-- code cleanup -->
+    <module name="UnusedImports"/>
+    <module name="RedundantImport"/>
+    <module name="IllegalImport" />
+    <module name="EqualsHashCode"/>
+    <module name="SimplifyBooleanExpression"/>
+    <module name="OneStatementPerLine"/>
+    <module name="UnnecessaryParentheses" />
+    <module name="SimplifyBooleanReturn"/>
+    
+    <!-- style -->
+    <module name="DefaultComesLast"/>
+    <module name="EmptyStatement"/>
+    <module name="ArrayTypeStyle"/>
+    <module name="UpperEll"/>
+    <module name="LeftCurly"/>
+    <module name="RightCurly"/>
+    <module name="EmptyStatement"/>
+    <module name="ConstantName">
+      <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+    </module>
+    <module name="LocalVariableName"/>
+    <module name="LocalFinalVariableName"/>
+    <module name="ClassTypeParameterName"/>
+    <module name="MemberName"/>
+    <module name="MethodTypeParameterName"/>
+    <module name="PackageName"/>
+    <module name="ParameterName"/>
+    <module name="StaticVariableName"/>
+    <module name="TypeName"/>
+    
+    <!-- dependencies -->
+    <module name="ImportControl">
+      <property name="file" value="${basedir}/checkstyle/import-control.xml"/>
+    </module>
+    
+    <!-- whitespace -->
+    <module name="GenericWhitespace"/>
+    <module name="NoWhitespaceBefore"/>
+    <module name="WhitespaceAfter" />
+    <module name="NoWhitespaceAfter"/>
+    <module name="WhitespaceAround">
+      <property name="allowEmptyConstructors" value="true"/>
+      <property name="allowEmptyMethods" value="true"/>
+    </module>
+    <module name="Indentation"/>
+    <module name="MethodParamPad"/>
+    <module name="ParenPad"/>
+    <module name="TypecastParenPad"/>
+  </module>
+</module>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
new file mode 100644
index 0000000..cca4b38
--- /dev/null
+++ b/checkstyle/import-control.xml
@@ -0,0 +1,100 @@
+<!DOCTYPE import-control PUBLIC
+    "-//Puppy Crawl//DTD Import Control 1.1//EN"
+    "http://www.puppycrawl.com/dtds/import_control_1_1.dtd">
+<!--
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+// 
+//    http://www.apache.org/licenses/LICENSE-2.0
+// 
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+--> 
+<import-control pkg="org.apache.kafka">
+	
+	<!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE -->
+	
+	<!-- common library dependencies -->
+	<allow pkg="java" />
+	<allow pkg="javax.management" />
+	<allow pkg="org.slf4j" />
+	<allow pkg="org.junit" />
+	
+	<!-- no one depends on the server -->
+	<disallow pkg="kafka" />
+	
+	<!-- anyone can use public classes -->
+	<allow pkg="org.apache.kafka.common" exact-match="true" />
+	<allow pkg="org.apache.kafka.common.utils" />
+	
+	<subpackage name="common">
+		<disallow pkg="org.apache.kafka.clients" />
+		<allow pkg="org.apache.kafka.common" exact-match="true" />
+		<allow pkg="org.apache.kafka.test" />
+	
+		<subpackage name="config">
+			<allow pkg="org.apache.kafka.common.config" />
+			<!-- for testing -->
+			<allow pkg="org.apache.kafka.common.metrics" />
+		</subpackage>
+	
+		<subpackage name="metrics">
+			<allow pkg="org.apache.kafka.common.metrics" />
+		</subpackage>
+	
+		<subpackage name="network">
+			<allow pkg="org.apache.kafka.common.metrics" />
+		</subpackage>
+	
+		<subpackage name="protocol">
+			<allow pkg="org.apache.kafka.common.errors" />
+			<allow pkg="org.apache.kafka.common.protocol.types" />
+		</subpackage>
+	
+		<subpackage name="record">
+			<allow pkg="net.jpountz" />
+			<allow pkg="org.apache.kafka.common.record" />
+		</subpackage>
+	
+		<subpackage name="requests">
+			<allow pkg="org.apache.kafka.common.protocol" />
+			<allow pkg="org.apache.kafka.common.network" />
+		</subpackage>
+	
+		<subpackage name="serialization">
+			<allow class="org.apache.kafka.common.errors.SerializationException" />
+		</subpackage>
+	</subpackage>
+
+	<subpackage name="clients">
+		<allow pkg="org.apache.kafka.common" />
+		<allow pkg="org.slf4j" />
+		<allow pkg="org.apache.kafka.clients" exact-match="true"/>
+		<allow pkg="org.apache.kafka.test" />
+	
+		<subpackage name="consumer">
+			<allow pkg="org.apache.kafka.clients.consumer" />
+		</subpackage>
+	
+		<subpackage name="producer">
+			<allow pkg="org.apache.kafka.clients.producer" />
+		</subpackage>
+	
+		<subpackage name="tools">
+			<allow pkg="org.apache.kafka.clients.producer" />
+			<allow pkg="org.apache.kafka.clients.consumer" />
+		</subpackage>
+	</subpackage>
+
+	<subpackage name="test">
+		<allow pkg="org.apache.kafka" />
+	</subpackage>
+	
+</import-control>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
new file mode 100644
index 0000000..d0da5d7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import static org.apache.kafka.common.utils.Utils.getHost;
+import static org.apache.kafka.common.utils.Utils.getPort;
+
+public class ClientUtils {
+
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+        for (String url : urls) {
+            if (url != null && url.length() > 0) {
+                String host = getHost(url);
+                Integer port = getPort(url);
+                if (host == null || port == null)
+                    throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                try {
+                    InetSocketAddress address = new InetSocketAddress(host, port);
+                    if (address.isUnresolved())
+                        throw new ConfigException("DNS resolution failed for url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                    addresses.add(address);
+                } catch (NumberFormatException e) {
+                    throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                }
+            }
+        }
+        if (addresses.size() < 1)
+            throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        return addresses;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 574287d..da76cc2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -69,8 +69,7 @@ final class ClusterConnectionStates {
         long timeWaited = now - state.lastConnectAttemptMs;
         if (state.state == ConnectionState.DISCONNECTED) {
             return Math.max(this.reconnectBackoffMs - timeWaited, 0);
-        }
-        else {
+        } else {
             // When connecting or connected, we should be able to delay indefinitely since other events (connection or
             // data acked) will cause a wakeup once data can be sent.
             return Long.MAX_VALUE;
@@ -109,7 +108,8 @@ final class ClusterConnectionStates {
      * @param node The node we have connected to
      */
     public void connected(int node) {
-        nodeState(node).state = ConnectionState.CONNECTED;
+        NodeConnectionState nodeState = nodeState(node);
+        nodeState.state = ConnectionState.CONNECTED;
     }
 
     /**
@@ -117,7 +117,8 @@ final class ClusterConnectionStates {
      * @param node The node we have disconnected from
      */
     public void disconnected(int node) {
-        nodeState(node).state = ConnectionState.DISCONNECTED;
+        NodeConnectionState nodeState = nodeState(node);
+        nodeState.state = ConnectionState.DISCONNECTED;
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
new file mode 100644
index 0000000..b8cdd14
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class encapsulating some of the logic around metadata.
+ * <p>
+ * This class is shared by the client thread (for partitioning) and the background sender thread.
+ * 
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
+ * topic we don't have any metadata for it will trigger a metadata update.
+ */
+public final class Metadata {
+
+    private static final Logger log = LoggerFactory.getLogger(Metadata.class);
+
+    private final long refreshBackoffMs;
+    private final long metadataExpireMs;
+    private int version;
+    private long lastRefreshMs;
+    private Cluster cluster;
+    private boolean needUpdate;
+    private final Set<String> topics;
+
+    /**
+     * Create a metadata instance with reasonable defaults
+     */
+    public Metadata() {
+        this(100L, 60 * 60 * 1000L);
+    }
+
+    /**
+     * Create a new Metadata instance
+     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+     *        polling
+     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+     */
+    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+        this.refreshBackoffMs = refreshBackoffMs;
+        this.metadataExpireMs = metadataExpireMs;
+        this.lastRefreshMs = 0L;
+        this.version = 0;
+        this.cluster = Cluster.empty();
+        this.needUpdate = false;
+        this.topics = new HashSet<String>();
+    }
+
+    /**
+     * Get the current cluster info without blocking
+     */
+    public synchronized Cluster fetch() {
+        return this.cluster;
+    }
+
+    /**
+     * Add the topic to maintain in the metadata
+     */
+    public synchronized void add(String topic) {
+        topics.add(topic);
+    }
+
+    /**
+     * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
+     * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
+     * is now
+     */
+    public synchronized long timeToNextUpdate(long nowMs) {
+        long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
+        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+        return Math.max(timeToExpire, timeToAllowUpdate);
+    }
+
+    /**
+     * Request an update of the current cluster metadata info, return the current version before the update
+     */
+    public synchronized int requestUpdate() {
+        this.needUpdate = true;
+        return this.version;
+    }
+
+    /**
+     * Wait for metadata update until the current version is larger than the last version we know of
+     */
+    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
+        if (maxWaitMs < 0) {
+            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
+        }
+        long begin = System.currentTimeMillis();
+        long remainingWaitMs = maxWaitMs;
+        while (this.version <= lastVersion) {
+            try {
+                if (remainingWaitMs != 0) {
+                    wait(remainingWaitMs);
+                }
+            } catch (InterruptedException e) { /* this is fine */
+            }
+            long elapsed = System.currentTimeMillis() - begin;
+            if (elapsed >= maxWaitMs)
+                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+            remainingWaitMs = maxWaitMs - elapsed;
+        }
+    }
+
+    /**
+     * Add one or more topics to maintain metadata for
+     */
+    public synchronized void addTopics(String... topics) {
+        for (String topic : topics)
+            this.topics.add(topic);
+        requestUpdate();
+    }
+
+    /**
+     * Get the list of topics we are currently maintaining metadata for
+     */
+    public synchronized Set<String> topics() {
+        return new HashSet<String>(this.topics);
+    }
+
+    /**
+     * Update the cluster metadata
+     */
+    public synchronized void update(Cluster cluster, long now) {
+        this.needUpdate = false;
+        this.lastRefreshMs = now;
+        this.version += 1;
+        this.cluster = cluster;
+        notifyAll();
+        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
+    }
+    
+    /**
+     * @return The current metadata version
+     */
+    public synchronized int version() {
+        return this.version;
+    }
+
+    /**
+     * The last time metadata was updated.
+     */
+    public synchronized long lastUpdate() {
+        return this.lastRefreshMs;
+    }
+
+    /**
+     * The metadata refresh backoff in ms
+     */
+    public long refreshBackoff() {
+        return refreshBackoffMs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 5950191..fef90a0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,7 +19,6 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.network.NetworkReceive;
@@ -199,7 +198,7 @@ public class NetworkClient implements KafkaClient {
         // should we update our metadata?
         long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
         long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
-        long waitForMetadataFetch = (this.metadataFetchInProgress ? Integer.MAX_VALUE : 0);
+        long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
         // if there is no node available to connect, back off refreshing metadata
         long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                                         waitForMetadataFetch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
index 072cc2e..7548a9b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
 package org.apache.kafka.clients.consumer;
 
 public enum CommitType {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 6d4ff7c..5fb2100 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.config.ConfigDef.Type;
  * The consumer configuration keys
  */
 public class ConsumerConfig extends AbstractConfig {
-    private static final ConfigDef config;
+    private static final ConfigDef CONFIG;
 
     /*
      * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
@@ -154,7 +154,7 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
 
     static {
-        config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
                                         Importance.HIGH,
                                         CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
@@ -277,11 +277,11 @@ public class ConsumerConfig extends AbstractConfig {
     }
 
     ConsumerConfig(Map<? extends Object, ? extends Object> props) {
-        super(config, props);
+        super(CONFIG, props);
     }
 
     public static void main(String[] args) {
-        System.out.println(config.toHtmlTable());
+        System.out.println(CONFIG.toHtmlTable());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
index 416d703..305ec8e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java
@@ -26,11 +26,11 @@ import org.apache.kafka.common.utils.AbstractIterator;
  * particular topic. There is one for every topic returned by a
  * {@link Consumer#poll(long)} operation.
  */
-public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
+public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
 
-    private final Map<TopicPartition, List<ConsumerRecord<K,V>>> records;
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
 
-    public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K,V>>> records) {
+    public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
         this.records = records;
     }
 
@@ -39,8 +39,8 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
      * 
      * @param partition The partition to get records for
      */
-    public Iterable<ConsumerRecord<K,V>> records(TopicPartition partition) {
-        List<ConsumerRecord<K,V>> recs = this.records.get(partition);
+    public Iterable<ConsumerRecord<K, V>> records(TopicPartition partition) {
+        List<ConsumerRecord<K, V>> recs = this.records.get(partition);
         if (recs == null)
             return Collections.emptyList();
         else
@@ -50,20 +50,20 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
     /**
      * Get just the records for the given topic
      */
-    public Iterable<ConsumerRecord<K,V>> records(String topic) {
+    public Iterable<ConsumerRecord<K, V>> records(String topic) {
         if (topic == null)
             throw new IllegalArgumentException("Topic must be non-null.");
-        List<List<ConsumerRecord<K,V>>> recs = new ArrayList<List<ConsumerRecord<K,V>>>();
-        for (Map.Entry<TopicPartition, List<ConsumerRecord<K,V>>> entry : records.entrySet()) {
+        List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
+        for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
             if (entry.getKey().equals(topic))
                 recs.add(entry.getValue());
         }
-        return new ConcatenatedIterable<K,V>(recs);
+        return new ConcatenatedIterable<K, V>(recs);
     }
 
     @Override
-    public Iterator<ConsumerRecord<K,V>> iterator() {
-        return new ConcatenatedIterable<K,V>(records.values()).iterator();
+    public Iterator<ConsumerRecord<K, V>> iterator() {
+        return new ConcatenatedIterable<K, V>(records.values()).iterator();
     }
     
     /**
@@ -71,26 +71,26 @@ public class ConsumerRecords<K,V> implements Iterable<ConsumerRecord<K,V>> {
      */
     public int count() {
         int count = 0;
-        for(List<ConsumerRecord<K,V>> recs: this.records.values())
+        for (List<ConsumerRecord<K, V>> recs: this.records.values())
             count += recs.size();
         return count;
     }
 
-    private static class ConcatenatedIterable<K,V> implements Iterable<ConsumerRecord<K,V>> {
+    private static class ConcatenatedIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
 
-        private final Iterable<? extends Iterable<ConsumerRecord<K,V>>> iterables;
+        private final Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables;
 
-        public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K,V>>> iterables) {
+        public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables) {
             this.iterables = iterables;
         }
 
         @Override
-        public Iterator<ConsumerRecord<K,V>> iterator() {
-            return new AbstractIterator<ConsumerRecord<K,V>>() {
-                Iterator<? extends Iterable<ConsumerRecord<K,V>>> iters = iterables.iterator();
-                Iterator<ConsumerRecord<K,V>> current;
+        public Iterator<ConsumerRecord<K, V>> iterator() {
+            return new AbstractIterator<ConsumerRecord<K, V>>() {
+                Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator();
+                Iterator<ConsumerRecord<K, V>> current;
 
-                public ConsumerRecord<K,V> makeNext() {
+                public ConsumerRecord<K, V> makeNext() {
                     if (current == null || !current.hasNext()) {
                         if (iters.hasNext())
                             current = iters.next().iterator();

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 300c551..09a6f11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -30,13 +30,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.ConnectionState;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.internals.Heartbeat;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -78,7 +78,6 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
-import org.apache.kafka.common.utils.ClientUtils;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -380,7 +379,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
     private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
     private static final long LATEST_OFFSET_TIMESTAMP = -1L;
-    private static final AtomicInteger consumerAutoId = new AtomicInteger(1);
+    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
 
     private final Time time;
     private final ConsumerMetrics metrics;
@@ -547,15 +546,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                                                                   TimeUnit.MILLISECONDS);
         String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
         String jmxPrefix = "kafka.consumer";
-        if(clientId .length() <= 0)
-          clientId = "consumer-" + consumerAutoId.getAndIncrement();
+        if (clientId.length() <= 0)
+          clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
         List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                                                                         MetricsReporter.class);
         reporters.add(new JmxReporter(jmxPrefix));
         Metrics metrics = new Metrics(metricConfig, reporters, time);
         this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
-        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
         this.metadata.update(Cluster.bootstrap(addresses), 0);
 
         String metricsGroup = "consumer";
@@ -1554,23 +1553,31 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                                         "The maximum lag for any partition in this window",
                                         tags), new Max());
 
+            Measurable numParts = 
+                new Measurable() {
+                    public double measure(MetricConfig config, long now) {
+                        return subscriptions.assignedPartitions().size();
+                    }
+                };
             metrics.addMetric(new MetricName("assigned-partitions",
                                              metricsGroup,
                                              "The number of partitions currently assigned to this consumer",
-                                             tags), new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return subscriptions.assignedPartitions().size();
-                }
-            });
-
+                                             tags),
+                              numParts);
+                              
+            
+            Measurable lastHeartbeat =                               
+                new Measurable() {
+                    public double measure(MetricConfig config, long now) {
+                        return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
+                    }
+                };
             metrics.addMetric(new MetricName("last-heartbeat-seconds-ago",
                                              metricsGroup,
                                              "The number of seconds since the last controller heartbeat",
-                                             tags), new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return TimeUnit.SECONDS.convert(now - heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
-                }
-            });
+                                             tags), 
+                                             
+                              lastHeartbeat);
         }
 
         public void recordTopicFetchMetrics(String topic, int bytes, int records) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
index d9483ec..ee0751e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
 package org.apache.kafka.clients.consumer.internals;
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
index 7e57a39..c06ab3a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.TopicPartition;
 public class NoOpConsumerRebalanceCallback implements ConsumerRebalanceCallback {
 
     @Override
-    public void onPartitionsAssigned(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
 
     @Override
-    public void onPartitionsRevoked(Consumer<?,?> consumer, Collection<TopicPartition> partitions) {}
+    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {}
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 71ce20d..d41d306 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
 package org.apache.kafka.clients.consumer.internals;
 
 import java.util.HashMap;
@@ -58,8 +70,8 @@ public class SubscriptionState {
             throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
         this.subscribedTopics.remove(topic);
         this.needsPartitionAssignment = true;
-        for(TopicPartition tp: assignedPartitions())
-            if(topic.equals(tp.topic()))
+        for (TopicPartition tp: assignedPartitions())
+            if (topic.equals(tp.topic()))
                 clearPartition(tp);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ebc4c53..1fd6917 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -19,8 +19,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
-import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.clients.producer.internals.Partitioner;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
@@ -45,7 +46,6 @@ import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.ClientUtils;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
@@ -60,9 +60,10 @@ import org.slf4j.LoggerFactory;
  * The producer manages a single background thread that does I/O as well as a TCP connection to each of the brokers it
  * needs to communicate with. Failure to close the producer after use will leak these resources.
  */
-public class KafkaProducer<K,V> implements Producer<K,V> {
+public class KafkaProducer<K, V> implements Producer<K, V> {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
+    private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
 
     private final Partitioner partitioner;
     private final int maxRequestSize;
@@ -79,7 +80,6 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
     private final Serializer<K> keySerializer;
     private final Serializer<V> valueSerializer;
     private final ProducerConfig producerConfig;
-    private static final AtomicInteger producerAutoId = new AtomicInteger(1);
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -154,6 +154,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
         return newProperties;
     }
 
+    @SuppressWarnings("unchecked")
     private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         log.trace("Starting the Kafka producer");
         this.producerConfig = config;
@@ -162,8 +163,8 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
                                                       .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
                                                                   TimeUnit.MILLISECONDS);
         String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
-        if(clientId.length() <= 0)
-          clientId = "producer-" + producerAutoId.getAndIncrement();
+        if (clientId.length() <= 0)
+          clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
         String jmxPrefix = "kafka.producer";
         List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                                                                         MetricsReporter.class);
@@ -216,16 +217,16 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
             this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                               Serializer.class);
             this.keySerializer.configure(config.originals(), true);
-        }
-        else
+        } else {
             this.keySerializer = keySerializer;
+        }
         if (valueSerializer == null) {
             this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                 Serializer.class);
             this.valueSerializer.configure(config.originals(), false);
-        }
-        else
+        } else {
             this.valueSerializer = valueSerializer;
+        }
 
         config.logUnused();
         log.debug("Kafka producer started");
@@ -244,7 +245,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
      * @param record  The record to be sent
      */
     @Override
-    public Future<RecordMetadata> send(ProducerRecord<K,V> record) {
+    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
         return send(record, null);
     }
 
@@ -309,7 +310,7 @@ public class KafkaProducer<K,V> implements Producer<K,V> {
      *        indicates no callback)
      */
     @Override
-    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {
+    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
         try {
             // first make sure the metadata for the topic is available
             waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 6b2471f..17fe541 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.MetricName;
  * @see KafkaProducer
  * @see MockProducer
  */
-public interface Producer<K,V> extends Closeable {
+public interface Producer<K, V> extends Closeable {
 
     /**
      * Send the given record asynchronously and return a future which will eventually contain the response information.
@@ -39,12 +39,12 @@ public interface Producer<K,V> extends Closeable {
      * @param record The record to send
      * @return A future which will eventually contain the response information
      */
-    public Future<RecordMetadata> send(ProducerRecord<K,V> record);
+    public Future<RecordMetadata> send(ProducerRecord<K, V> record);
 
     /**
      * Send a record and invoke the given callback when the record has been acknowledged by the server
      */
-    public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback);
+    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
 
     /**
      * Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 9a43d66..122375c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -35,7 +35,7 @@ public class ProducerConfig extends AbstractConfig {
      * CHANGE WILL BREAK USER CODE.
      */
 
-    private static final ConfigDef config;
+    private static final ConfigDef CONFIG;
 
     /** <code>bootstrap.servers</code> */
     public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
@@ -167,13 +167,13 @@ public class ProducerConfig extends AbstractConfig {
     private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface.";
 
     static {
-        config = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                                 .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                                 .define(ACKS_CONFIG,
                                         Type.STRING,
                                         "1",
-                                        in("all","-1", "0", "1"),
+                                        in("all", "-1", "0", "1"),
                                         Importance.HIGH,
                                         ACKS_DOC)
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", Importance.HIGH, COMPRESSION_TYPE_DOC)
@@ -218,11 +218,11 @@ public class ProducerConfig extends AbstractConfig {
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {
-        super(config, props);
+        super(CONFIG, props);
     }
 
     public static void main(String[] args) {
-        System.out.println(config.toHtmlTable());
+        System.out.println(CONFIG.toHtmlTable());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
index 8d4156d..4cb1e50 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
@@ -80,11 +80,11 @@ public final class BufferPool {
         this.time = time;
         this.waitTime = this.metrics.sensor("bufferpool-wait-time");
         MetricName metricName = new MetricName("bufferpool-wait-ratio",
-                                              metricGrpName,
-                                              "The fraction of time an appender waits for space allocation.",
-                                              metricTags);
+                                               metricGrpName,
+                                               "The fraction of time an appender waits for space allocation.",
+                                               metricTags);
         this.waitTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
-   }
+    }
 
     /**
      * Allocate a buffer of the given size. This method blocks if there is not enough memory and the buffer pool

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
deleted file mode 100644
index 3aff624..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.producer.internals;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A class encapsulating some of the logic around metadata.
- * <p>
- * This class is shared by the client thread (for partitioning) and the background sender thread.
- * 
- * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
- * topic we don't have any metadata for it will trigger a metadata update.
- */
-public final class Metadata {
-
-    private static final Logger log = LoggerFactory.getLogger(Metadata.class);
-
-    private final long refreshBackoffMs;
-    private final long metadataExpireMs;
-    private int version;
-    private long lastRefreshMs;
-    private Cluster cluster;
-    private boolean needUpdate;
-    private final Set<String> topics;
-
-    /**
-     * Create a metadata instance with reasonable defaults
-     */
-    public Metadata() {
-        this(100L, 60 * 60 * 1000L);
-    }
-
-    /**
-     * Create a new Metadata instance
-     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
-     *        polling
-     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
-     */
-    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
-        this.refreshBackoffMs = refreshBackoffMs;
-        this.metadataExpireMs = metadataExpireMs;
-        this.lastRefreshMs = 0L;
-        this.version = 0;
-        this.cluster = Cluster.empty();
-        this.needUpdate = false;
-        this.topics = new HashSet<String>();
-    }
-
-    /**
-     * Get the current cluster info without blocking
-     */
-    public synchronized Cluster fetch() {
-        return this.cluster;
-    }
-
-    /**
-     * Add the topic to maintain in the metadata
-     */
-    public synchronized void add(String topic) {
-        topics.add(topic);
-    }
-
-    /**
-     * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
-     * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
-     * is now
-     */
-    public synchronized long timeToNextUpdate(long nowMs) {
-        long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
-        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
-        return Math.max(timeToExpire, timeToAllowUpdate);
-    }
-
-    /**
-     * Request an update of the current cluster metadata info, return the current version before the update
-     */
-    public synchronized int requestUpdate() {
-        this.needUpdate = true;
-        return this.version;
-    }
-
-    /**
-     * Wait for metadata update until the current version is larger than the last version we know of
-     */
-    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) {
-        if (maxWaitMs < 0) {
-            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
-        }
-        long begin = System.currentTimeMillis();
-        long remainingWaitMs = maxWaitMs;
-        while (this.version <= lastVersion) {
-            try {
-                if (remainingWaitMs != 0) {
-                    wait(remainingWaitMs);
-                }
-            } catch (InterruptedException e) { /* this is fine */
-            }
-            long elapsed = System.currentTimeMillis() - begin;
-            if (elapsed >= maxWaitMs)
-                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
-            remainingWaitMs = maxWaitMs - elapsed;
-        }
-    }
-
-    /**
-     * Add one or more topics to maintain metadata for
-     */
-    public synchronized void addTopics(String... topics) {
-        for (String topic : topics)
-            this.topics.add(topic);
-        requestUpdate();
-    }
-
-    /**
-     * Get the list of topics we are currently maintaining metadata for
-     */
-    public synchronized Set<String> topics() {
-        return new HashSet<String>(this.topics);
-    }
-
-    /**
-     * Update the cluster metadata
-     */
-    public synchronized void update(Cluster cluster, long now) {
-        this.needUpdate = false;
-        this.lastRefreshMs = now;
-        this.version += 1;
-        this.cluster = cluster;
-        notifyAll();
-        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
-    }
-    
-    /**
-     * @return The current metadata version
-     */
-    public synchronized int version() {
-        return this.version;
-    }
-
-    /**
-     * The last time metadata was updated.
-     */
-    public synchronized long lastUpdate() {
-        return this.lastRefreshMs;
-    }
-
-    /**
-     * The metadata refresh backoff in ms
-     */
-    public long refreshBackoff() {
-        return refreshBackoffMs;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
index b70ece7..8e5855d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProduceRequestResult.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.producer.internals;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 50889e4..ecfe214 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -102,26 +102,27 @@ public final class RecordAccumulator {
     private void registerMetrics(Metrics metrics, String metricGrpName, Map<String, String> metricTags) {
 
         MetricName metricName = new MetricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records", metricTags);
-        metrics.addMetric(metricName,
-                          new Measurable() {
-                              public double measure(MetricConfig config, long now) {
-                                  return free.queued();
-                              }
-                          });
+        Measurable waitingThreads = new Measurable() {
+            public double measure(MetricConfig config, long now) {
+                return free.queued();
+            }
+        };
+        metrics.addMetric(metricName, waitingThreads);
+                 
         metricName = new MetricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).", metricTags);
-        metrics.addMetric(metricName,
-                          new Measurable() {
-                              public double measure(MetricConfig config, long now) {
-                                  return free.totalMemory();
-                              }
-                          });
+        Measurable totalBytes = new Measurable() {
+            public double measure(MetricConfig config, long now) {
+                return free.totalMemory();
+            }
+        };
+        metrics.addMetric(metricName, totalBytes);
         metricName = new MetricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).", metricTags);
-        metrics.addMetric(metricName,
-                          new Measurable() {
-                              public double measure(MetricConfig config, long now) {
-                                  return free.availableMemory();
-                              }
-                          });
+        Measurable availableBytes = new Measurable() {
+            public double measure(MetricConfig config, long now) {
+                return free.availableMemory();
+            }
+        };
+        metrics.addMetric(metricName, availableBytes);
     }
 
     /**
@@ -228,8 +229,7 @@ public final class RecordAccumulator {
                         boolean sendable = full || expired || exhausted || closed;
                         if (sendable && !backingOff) {
                             readyNodes.add(leader);
-                        }
-                        else {
+                        } else {
                             // Note that this results in a conservative estimate since an un-sendable partition may have
                             // a leader that will later be found to have sendable data. However, this is good enough
                             // since we'll just wake up and then sleep again for the remaining time.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8726809..ed9c63a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 689bae9..13f4d59 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -45,12 +45,12 @@ public class ProducerPerformance {
         }
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[],byte[]>(props);
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
 
         /* setup perf test */
         byte[] payload = new byte[recordSize];
         Arrays.fill(payload, (byte) 1);
-        ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>(topicName, payload);
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicName, payload);
         long sleepTime = NS_PER_SEC / throughput;
         long sleepDeficitNs = 0;
         Stats stats = new Stats(numRecords, 5000);
@@ -66,8 +66,8 @@ public class ProducerPerformance {
              * and then make up the whole deficit in one longer sleep.
              */
             if (throughput > 0) {
-                float elapsed = (sendStart - start)/1000.f;
-                if (elapsed > 0 && i/elapsed > throughput) {
+                float elapsed = (sendStart - start) / 1000.f;
+                if (elapsed > 0 && i / elapsed > throughput) {
                     sleepDeficitNs += sleepTime;
                     if (sleepDeficitNs >= MIN_SLEEP_NS) {
                         long sleepMs = sleepDeficitNs / 1000000;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index d7ccbcd..8fcd291 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -40,8 +40,8 @@ public final class Cluster {
         this.nodes = Collections.unmodifiableList(copy);
         
         this.nodesById = new HashMap<Integer, Node>();
-        for(Node node: nodes)
-          this.nodesById.put(node.id(), node);
+        for (Node node: nodes)
+            this.nodesById.put(node.id(), node);
 
         // index the partitions by topic/partition for quick lookup
         this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 7e977e9..04b4a09 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -90,8 +90,8 @@ public final class MetricName {
             throw new IllegalArgumentException("keyValue needs to be specified in paris");
         Map<String, String> tags = new HashMap<String, String>();
 
-        for (int i=0; i<(keyValue.length / 2); i++)
-            tags.put(keyValue[i], keyValue[i+1]);
+        for (int i = 0; i < keyValue.length / 2; i++)
+            tags.put(keyValue[i], keyValue[i + 1]);
         return tags;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
index 28562f9..321da8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
+++ b/clients/src/main/java/org/apache/kafka/common/PartitionInfo.java
@@ -72,7 +72,7 @@ public class PartitionInfo {
         return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s",
                              topic,
                              partition,
-                             leader == null? "none" : leader.id(),
+                             leader == null ? "none" : leader.id(),
                              fmtNodeIds(replicas),
                              fmtNodeIds(inSyncReplicas));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 38ce10b..8523333 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -53,6 +53,7 @@ public class ConfigDef {
 
     /**
      * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
+     * 
      * @return new unmodifiable {@link Set} instance containing the keys
      */
     public Set<String> names() {
@@ -61,6 +62,7 @@ public class ConfigDef {
 
     /**
      * Define a new configuration
+     * 
      * @param name The name of the config parameter
      * @param type The type of the config
      * @param defaultValue The default value to use if this config isn't present
@@ -69,16 +71,23 @@ public class ConfigDef {
      * @param documentation The documentation string for the config
      * @return This ConfigDef so you can chain calls
      */
-    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+    public ConfigDef define(String name,
+                            Type type,
+                            Object defaultValue,
+                            Validator validator,
+                            Importance importance,
+                            String documentation) {
         if (configKeys.containsKey(name))
             throw new ConfigException("Configuration " + name + " is defined twice.");
-        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE
+                : parseType(name, defaultValue, type);
         configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation));
         return this;
     }
 
     /**
      * Define a new configuration with no special validation logic
+     * 
      * @param name The name of the config parameter
      * @param type The type of the config
      * @param defaultValue The default value to use if this config isn't present
@@ -92,6 +101,7 @@ public class ConfigDef {
 
     /**
      * Define a required parameter with no default value
+     * 
      * @param name The name of the config parameter
      * @param type The type of the config
      * @param validator A validator to use in checking the correctness of the config
@@ -105,6 +115,7 @@ public class ConfigDef {
 
     /**
      * Define a required parameter with no default value and no special validation logic
+     * 
      * @param name The name of the config parameter
      * @param type The type of the config
      * @param importance The importance of this config: is this something you will likely need to change.
@@ -120,6 +131,7 @@ public class ConfigDef {
      * that the keys of the map are strings, but the values can either be strings or they may already be of the
      * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
      * programmatically constructed map.
+     * 
      * @param props The configs to parse and validate
      * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
      *         the appropriate type (int, string, etc)
@@ -132,7 +144,8 @@ public class ConfigDef {
             if (props.containsKey(key.name))
                 value = parseType(key.name, props.get(key.name), key.type);
             else if (key.defaultValue == NO_DEFAULT_VALUE)
-                throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+                throw new ConfigException("Missing required configuration \"" + key.name
+                        + "\" which has no default value.");
             else
                 value = key.defaultValue;
             if (key.validator != null)
@@ -144,6 +157,7 @@ public class ConfigDef {
 
     /**
      * Parse a value according to its expected type.
+     * 
      * @param name The config name
      * @param value The config value
      * @param type The expected type
@@ -157,14 +171,13 @@ public class ConfigDef {
             switch (type) {
                 case BOOLEAN:
                     if (value instanceof String) {
-                    	if (trimmed.equalsIgnoreCase("true"))
-                    		return true;
-                    	else if (trimmed.equalsIgnoreCase("false"))
-                    		return false;
-                    	else
-                    		throw new ConfigException(name, value, "Expected value to be either true or false");
-                    }
-                    else if (value instanceof Boolean)
+                        if (trimmed.equalsIgnoreCase("true"))
+                            return true;
+                        else if (trimmed.equalsIgnoreCase("false"))
+                            return false;
+                        else
+                            throw new ConfigException(name, value, "Expected value to be either true or false");
+                    } else if (value instanceof Boolean)
                         return value;
                     else
                         throw new ConfigException(name, value, "Expected value to be either true or false");
@@ -172,7 +185,8 @@ public class ConfigDef {
                     if (value instanceof String)
                         return trimmed;
                     else
-                        throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
+                        throw new ConfigException(name, value, "Expected value to be a string, but it was a "
+                                + value.getClass().getName());
                 case INT:
                     if (value instanceof Integer) {
                         return (Integer) value;
@@ -256,6 +270,7 @@ public class ConfigDef {
 
         /**
          * A numeric range that checks only the lower bound
+         * 
          * @param min The minimum acceptable value
          */
         public static Range atLeast(Number min) {
@@ -287,32 +302,30 @@ public class ConfigDef {
         }
     }
 
-  public static class ValidString implements Validator {
-    List<String> validStrings;
+    public static class ValidString implements Validator {
+        private final List<String> validStrings;
 
-    private ValidString(List<String> validStrings) {
-      this.validStrings = validStrings;
-    }
+        private ValidString(List<String> validStrings) {
+            this.validStrings = validStrings;
+        }
 
-    public static ValidString in(String... validStrings) {
-      return new ValidString(Arrays.asList(validStrings));
-    }
+        public static ValidString in(String... validStrings) {
+            return new ValidString(Arrays.asList(validStrings));
+        }
 
-    @Override
-    public void ensureValid(String name, Object o) {
-      String s = (String) o;
-      if (!validStrings.contains(s)) {
-        throw new ConfigException(name,o,"String must be one of: " + Utils.join(validStrings, ", "));
-      }
+        @Override
+        public void ensureValid(String name, Object o) {
+            String s = (String) o;
+            if (!validStrings.contains(s))
+                throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
+        }
 
-    }
+        public String toString() {
+            return "[" + Utils.join(validStrings, ", ") + "]";
+        }
 
-    public String toString() {
-      return "[" + Utils.join(validStrings, ", ") + "]";
     }
 
-  }
-
     private static class ConfigKey {
         public final String name;
         public final Type type;
@@ -321,7 +334,12 @@ public class ConfigDef {
         public final Validator validator;
         public final Importance importance;
 
-        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+        public ConfigKey(String name,
+                         Type type,
+                         Object defaultValue,
+                         Validator validator,
+                         Importance importance,
+                         String documentation) {
             super();
             this.name = name;
             this.type = type;

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
index 75c80a9..a6107b8 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -17,27 +17,26 @@
 package org.apache.kafka.common.errors;
 
 /**
- * Number of insync replicas for the partition is lower than min.insync.replicas
- * This exception is raised when the low ISR size is discovered *after* the message
- * was already appended to the log. Producer retries will cause duplicates.
+ * Number of insync replicas for the partition is lower than min.insync.replicas This exception is raised when the low
+ * ISR size is discovered *after* the message was already appended to the log. Producer retries will cause duplicates.
  */
 public class NotEnoughReplicasAfterAppendException extends RetriableException {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-  public NotEnoughReplicasAfterAppendException() {
-    super();
-  }
+    public NotEnoughReplicasAfterAppendException() {
+        super();
+    }
 
-  public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
-    super(message,cause);
-  }
+    public NotEnoughReplicasAfterAppendException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-  public NotEnoughReplicasAfterAppendException(String message) {
-    super(message);
-  }
+    public NotEnoughReplicasAfterAppendException(String message) {
+        super(message);
+    }
 
-  public NotEnoughReplicasAfterAppendException(Throwable cause) {
-    super(cause);
-  }
+    public NotEnoughReplicasAfterAppendException(Throwable cause) {
+        super(cause);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
index 486d515..1573227 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotEnoughReplicasException.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -20,21 +20,21 @@ package org.apache.kafka.common.errors;
  * Number of insync replicas for the partition is lower than min.insync.replicas
  */
 public class NotEnoughReplicasException extends RetriableException {
-  private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 1L;
 
-  public NotEnoughReplicasException() {
-    super();
-  }
+    public NotEnoughReplicasException() {
+        super();
+    }
 
-  public NotEnoughReplicasException(String message, Throwable cause) {
-    super(message, cause);
-  }
+    public NotEnoughReplicasException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
-  public NotEnoughReplicasException(String message) {
-    super(message);
-  }
+    public NotEnoughReplicasException(String message) {
+        super(message);
+    }
 
-  public NotEnoughReplicasException(Throwable cause) {
-    super(cause);
-  }
+    public NotEnoughReplicasException(Throwable cause) {
+        super(cause);
+    }
 }