You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/03/28 04:53:54 UTC

[1/2] KAFKA-1251: Add metrics to the producer.

Repository: kafka
Updated Branches:
  refs/heads/trunk 9bc47bc13 -> 23d7fc470


http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 12c9500..a2b7722 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -70,6 +70,7 @@ public class SenderTest {
                                        REQUEST_TIMEOUT_MS,
                                        SEND_BUFFER_SIZE,
                                        RECEIVE_BUFFER_SIZE,
+                                       metrics,
                                        time);
 
     @Before
@@ -114,6 +115,7 @@ public class SenderTest {
                                    REQUEST_TIMEOUT_MS,
                                    SEND_BUFFER_SIZE,
                                    RECEIVE_BUFFER_SIZE,
+                                   new Metrics(),
                                    time);
         Future<RecordMetadata> future = accumulator.append(tp, "key".getBytes(), "value".getBytes(), CompressionType.NONE, null);
         RequestSend request1 = completeSend(sender);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index fdd8914..9ff73f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics;
 
@@ -22,25 +18,16 @@ import static org.junit.Assert.fail;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-
 import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Quota;
-import org.apache.kafka.common.metrics.QuotaViolationException;
-import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Total;
-import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.Test;
 
@@ -154,9 +141,10 @@ public class MetricsTest {
     public void testOldDataHasNoEffect() {
         Max max = new Max();
         long windowMs = 100;
-        MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS);
+        int samples = 2;
+        MetricConfig config = new MetricConfig().timeWindow(windowMs, TimeUnit.MILLISECONDS).samples(samples);
         max.record(config, 50, time.nanoseconds());
-        time.sleep(windowMs);
+        time.sleep(samples * windowMs);
         assertEquals(Double.NEGATIVE_INFINITY, max.measure(config, time.nanoseconds()), EPS);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 90e2dcf..99856e9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
@@ -27,16 +23,12 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-
-import org.apache.kafka.common.network.NetworkReceive;
-import org.apache.kafka.common.network.NetworkSend;
-import org.apache.kafka.common.network.Selectable;
-import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
@@ -58,7 +50,7 @@ public class SelectorTest {
     public void setup() throws Exception {
         this.server = new EchoServer();
         this.server.start();
-        this.selector = new Selector();
+        this.selector = new Selector(new Metrics(), new MockTime());
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
index 7239b4a..9d98c11 100644
--- a/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
+++ b/clients/src/test/java/org/apache/kafka/test/MetricsBench.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.test;
 
@@ -27,7 +23,6 @@ import org.apache.kafka.common.metrics.stats.Percentile;
 import org.apache.kafka.common.metrics.stats.Percentiles;
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing;
 
-
 public class MetricsBench {
 
     public static void main(String[] args) {
@@ -48,7 +43,7 @@ public class MetricsBench {
         }
         long start = System.nanoTime();
         for (int i = 0; i < iters; i++)
-            child.record(i);
+            parent.record(i);
         double ellapsed = (System.nanoTime() - start) / (double) iters;
         System.out.println(String.format("%.2f ns per metric recording.", ellapsed));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
index 46cf86e..6aab854 100644
--- a/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
+++ b/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.test;
 
@@ -24,11 +20,11 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.SystemTime;
 
-
 public class Microbenchmarks {
 
     public static void main(String[] args) throws Exception {
@@ -112,6 +108,43 @@ public class Microbenchmarks {
         t1.join();
         t2.join();
 
+        System.out.println("Testing locks");
+        done.set(false);
+        final ReentrantLock lock2 = new ReentrantLock();
+        Thread t3 = new Thread() {
+            public void run() {
+                time.sleep(1);
+                int counter = 0;
+                long start = time.nanoseconds();
+                for (int i = 0; i < iters; i++) {
+                    lock2.lock();
+                    counter++;
+                    lock2.unlock();
+                }
+                System.out.println("lock: " + ((System.nanoTime() - start) / iters));
+                System.out.println(counter);
+                done.set(true);
+            }
+        };
+
+        Thread t4 = new Thread() {
+            public void run() {
+                int counter = 0;
+                while (!done.get()) {
+                    time.sleep(1);
+                    lock2.lock();
+                    counter++;
+                    lock2.unlock();
+                }
+                System.out.println("Counter: " + counter);
+            }
+        };
+
+        t3.start();
+        t4.start();
+        t3.join();
+        t4.join();
+
         Map<String, Integer> values = new HashMap<String, Integer>();
         for (int i = 0; i < 100; i++)
             values.put(Integer.toString(i), i);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index baa698b..9502254 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -41,7 +41,7 @@ log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 
 log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
 log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
-log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
 log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
 log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
 


[2/2] git commit: KAFKA-1251: Add metrics to the producer.

Posted by jk...@apache.org.
KAFKA-1251: Add metrics to the producer.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23d7fc47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23d7fc47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23d7fc47

Branch: refs/heads/trunk
Commit: 23d7fc470638c4dffa5ca005ef2e3d34c14dc92e
Parents: 9bc47bc
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Mar 18 17:38:56 2014 -0700
Committer: Jay Kreps <ja...@gmail.com>
Committed: Thu Mar 27 20:52:42 2014 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  46 ++--
 .../kafka/clients/producer/ProducerConfig.java  |  15 +-
 .../internals/FutureRecordMetadata.java         |  25 +--
 .../producer/internals/RecordAccumulator.java   |  37 ++--
 .../clients/producer/internals/RecordBatch.java |   6 +-
 .../clients/producer/internals/Sender.java      | 218 ++++++++++++++++---
 .../clients/tools/ProducerPerformance.java      |  39 ++--
 .../apache/kafka/common/metrics/Metrics.java    |  61 ++++--
 .../org/apache/kafka/common/metrics/Sensor.java |  45 ++--
 .../kafka/common/metrics/stats/Percentiles.java |   2 +-
 .../apache/kafka/common/metrics/stats/Rate.java |  42 ++--
 .../kafka/common/metrics/stats/SampledStat.java |  49 +++--
 .../kafka/common/network/ByteBufferSend.java    |  30 +--
 .../kafka/common/network/NetworkReceive.java    |  24 +-
 .../apache/kafka/common/network/Selector.java   | 109 +++++++++-
 .../kafka/common/utils/CopyOnWriteMap.java      |  26 +--
 .../kafka/clients/producer/MetadataTest.java    |  24 +-
 .../clients/producer/RecordAccumulatorTest.java |   8 +-
 .../kafka/clients/producer/SenderTest.java      |   2 +
 .../kafka/common/metrics/MetricsTest.java       |  40 ++--
 .../kafka/common/network/SelectorTest.java      |  34 ++-
 .../org/apache/kafka/test/MetricsBench.java     |  27 +--
 .../org/apache/kafka/test/Microbenchmarks.java  |  63 ++++--
 config/log4j.properties                         |   2 +-
 24 files changed, 643 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 1ff9174..8c1c575 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -39,12 +39,15 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +73,7 @@ public class KafkaProducer implements Producer {
     private final Metrics metrics;
     private final Thread ioThread;
     private final CompressionType compressionType;
+    private final Sensor errors;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -91,9 +95,14 @@ public class KafkaProducer implements Producer {
 
     private KafkaProducer(ProducerConfig config) {
         log.trace("Starting the Kafka producer");
-        this.metrics = new Metrics(new MetricConfig(),
-                                   Collections.singletonList((MetricsReporter) new JmxReporter("kafka.producer.")),
-                                   new SystemTime());
+        Time time = new SystemTime();
+        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES))
+                                                      .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS),
+                                                                  TimeUnit.MILLISECONDS);
+        String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+        String jmxPrefix = "kafka.producer." + (clientId.length() > 0 ? clientId + "." : "");
+        List<MetricsReporter> reporters = Collections.singletonList((MetricsReporter) new JmxReporter(jmxPrefix));
+        this.metrics = new Metrics(metricConfig, reporters, time);
         this.partitioner = new Partitioner();
         this.metadataFetchTimeoutMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
         this.metadata = new Metadata(config.getLong(ProducerConfig.METADATA_FETCH_BACKOFF_CONFIG),
@@ -107,13 +116,13 @@ public class KafkaProducer implements Producer {
                                                  config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                                                  config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                                                  metrics,
-                                                 new SystemTime());
+                                                 time);
         List<InetSocketAddress> addresses = parseAndValidateAddresses(config.getList(ProducerConfig.BROKER_LIST_CONFIG));
-        this.metadata.update(Cluster.bootstrap(addresses), System.currentTimeMillis());
-        this.sender = new Sender(new Selector(),
+        this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
+        this.sender = new Sender(new Selector(this.metrics, time),
                                  this.metadata,
                                  this.accumulator,
-                                 config.getString(ProducerConfig.CLIENT_ID_CONFIG),
+                                 clientId,
                                  config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                                  config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                                  (short) config.getInt(ProducerConfig.REQUIRED_ACKS_CONFIG),
@@ -121,9 +130,14 @@ public class KafkaProducer implements Producer {
                                  config.getInt(ProducerConfig.REQUEST_TIMEOUT_CONFIG),
                                  config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                                  config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
+                                 this.metrics,
                                  new SystemTime());
         this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true);
         this.ioThread.start();
+
+        this.errors = this.metrics.sensor("errors");
+        this.errors.add("message-error-rate", "The average number of errors per second returned to the client.", new Rate());
+
         config.logUnused();
         log.debug("Kafka producer started");
     }
@@ -223,7 +237,8 @@ public class KafkaProducer implements Producer {
         try {
             Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
             int partition = partitioner.partition(record, cluster);
-            ensureValidSize(record.key(), record.value());
+            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
+            ensureValidRecordSize(serializedSize);
             TopicPartition tp = new TopicPartition(record.topic(), partition);
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
             FutureRecordMetadata future = accumulator.append(tp, record.key(), record.value(), compressionType, callback);
@@ -235,24 +250,25 @@ public class KafkaProducer implements Producer {
             log.debug("Exception occurred during message send:", e);
             if (callback != null)
                 callback.onCompletion(null, e);
+            this.errors.record();
             return new FutureFailure(e);
         } catch (InterruptedException e) {
+            this.errors.record();
             throw new KafkaException(e);
         }
     }
 
     /**
-     * Check that this key-value pair will have a serialized size small enough
+     * Validate that the record size isn't too large
      */
-    private void ensureValidSize(byte[] key, byte[] value) {
-        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
-        if (serializedSize > this.maxRequestSize)
-            throw new RecordTooLargeException("The message is " + serializedSize
+    private void ensureValidRecordSize(int size) {
+        if (size > this.maxRequestSize)
+            throw new RecordTooLargeException("The message is " + size
                                               + " bytes when serialized which is larger than the maximum request size you have configured with the "
                                               + ProducerConfig.MAX_REQUEST_SIZE_CONFIG
                                               + " configuration.");
-        if (serializedSize > this.totalMemorySize)
-            throw new RecordTooLargeException("The message is " + serializedSize
+        if (size > this.totalMemorySize)
+            throw new RecordTooLargeException("The message is " + size
                                               + " bytes when serialized which is larger than the total memory buffer you have configured with the "
                                               + ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG
                                               + " configuration.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 48706ba..259c14b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -141,6 +141,17 @@ public class ProducerConfig extends AbstractConfig {
     public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
 
     /**
+     * The window size for a single metrics sample in ms. Defaults to 30 seconds.
+     */
+    public static final String METRICS_SAMPLE_WINDOW_MS = "metrics.sample.window.ms";
+
+    /**
+     * The number of samples used when reporting metrics. Defaults to two. So by default we use two 30 second windows,
+     * so metrics are computed over up to 60 seconds.
+     */
+    public static final String METRICS_NUM_SAMPLES = "metrics.num.samples";
+
+    /**
      * Should we register the Kafka metrics as JMX mbeans?
      */
     public static final String ENABLE_JMX_CONFIG = "enable.jmx";
@@ -166,7 +177,9 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), "blah blah")
                                 .define(COMPRESSION_TYPE_CONFIG, Type.STRING, "none", "blah blah")
-                                .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "");
+                                .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
+                                .define(METRICS_SAMPLE_WINDOW_MS, Type.LONG, 30000, atLeast(0), "")
+                                .define(METRICS_NUM_SAMPLES, Type.INT, 2, atLeast(1), "");
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
index aec31c3..4a2da41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer.internals;
 
@@ -23,7 +19,6 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.kafka.clients.producer.RecordMetadata;
 
-
 /**
  * The future result of a record send
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 50bf95f..ffd13ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -28,8 +28,8 @@ import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -90,27 +90,32 @@ public final class RecordAccumulator {
     }
 
     private void registerMetrics(Metrics metrics) {
-        metrics.addMetric("blocked_threads",
-            "The number of user threads blocked waiting for buffer memory to enqueue their records",
-            new Measurable() {
+        metrics.addMetric("waiting-threads",
+                          "The number of user threads blocked waiting for buffer memory to enqueue their records",
+                          new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.queued();
                               }
                           });
-        metrics.addMetric("buffer_total_bytes",
-            "The total amount of buffer memory that is available (not currently used for buffering records).",
-            new Measurable() {
+        metrics.addMetric("buffer-total-bytes",
+                          "The maximum amount of buffer memory the client can use (whether or not it is currently used).",
+                          new Measurable() {
                               public double measure(MetricConfig config, long now) {
                                   return free.totalMemory();
                               }
                           });
-        metrics.addMetric("buffer_available_bytes",
-            "The total amount of buffer memory that is available (not currently used for buffering records).",
-            new Measurable() {
-                public double measure(MetricConfig config, long now) {
-                    return free.availableMemory();
-                }
-            });
+        metrics.addMetric("buffer-available-bytes",
+                          "The total amount of buffer memory that is not being used (either unallocated or in the free list).",
+                          new Measurable() {
+                              public double measure(MetricConfig config, long now) {
+                                  return free.availableMemory();
+                              }
+                          });
+        metrics.addMetric("ready-partitions", "The number of topic-partitions with buffered data ready to be sent.", new Measurable() {
+            public double measure(MetricConfig config, long now) {
+                return ready(now).size();
+            }
+        });
     }
 
     /**
@@ -226,10 +231,11 @@ public final class RecordAccumulator {
      * 
      * @param partitions The list of partitions to drain
      * @param maxSize The maximum number of bytes to drain
+     * @param now The current unix time
      * @return A list of {@link RecordBatch} for partitions specified with total size less than the requested maxSize.
      *         TODO: There may be a starvation issue due to iteration order
      */
-    public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize) {
+    public List<RecordBatch> drain(List<TopicPartition> partitions, int maxSize, long now) {
         if (partitions.isEmpty())
             return Collections.emptyList();
         int size = 0;
@@ -252,6 +258,7 @@ public final class RecordAccumulator {
                         batch.records.close();
                         size += batch.records.sizeInBytes();
                         ready.add(batch);
+                        batch.drained = now;
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 35f1d7a..94157f7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -18,6 +18,7 @@ import java.util.List;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,8 +32,10 @@ public final class RecordBatch {
     private static final Logger log = LoggerFactory.getLogger(RecordBatch.class);
 
     public int recordCount = 0;
+    public int maxRecordSize = 0;
     public volatile int attempts = 0;
     public final long created;
+    public long drained;
     public long lastAttempt;
     public final MemoryRecords records;
     public final TopicPartition topicPartition;
@@ -58,6 +61,7 @@ public final class RecordBatch {
             return null;
         } else {
             this.records.append(0L, key, value);
+            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
             FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
             if (callback != null)
                 thunks.add(new Thunk(callback, future));
@@ -70,7 +74,7 @@ public final class RecordBatch {
      * Complete the request
      * 
      * @param baseOffset The base offset of the messages assigned by the server
-     * @param exception The exception returned or null if no exception
+     * @param exception The exception that occurred (or null if the request was successful)
      */
     public void done(long baseOffset, RuntimeException exception) {
         this.produceFuture.done(topicPartition, baseOffset, exception);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 565331d..d89813e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -24,13 +24,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
-import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.NetworkSend;
 import org.apache.kafka.common.network.Selectable;
@@ -108,6 +115,9 @@ public class Sender implements Runnable {
     /* true while the sender thread is still running */
     private volatile boolean running;
 
+    /* metrics */
+    private final SenderMetrics sensors;
+
     public Sender(Selectable selector,
                   Metadata metadata,
                   RecordAccumulator accumulator,
@@ -119,6 +129,7 @@ public class Sender implements Runnable {
                   int requestTimeout,
                   int socketSendBuffer,
                   int socketReceiveBuffer,
+                  Metrics metrics,
                   Time time) {
         this.nodeStates = new NodeStates(reconnectBackoffMs);
         this.accumulator = accumulator;
@@ -137,6 +148,7 @@ public class Sender implements Runnable {
         this.metadataFetchInProgress = false;
         this.time = time;
         this.metadataFetchNodeIndex = new Random().nextInt();
+        this.sensors = new SenderMetrics(metrics);
     }
 
     /**
@@ -191,8 +203,9 @@ public class Sender implements Runnable {
         List<TopicPartition> sendable = processReadyPartitions(cluster, ready, now);
 
         // create produce requests
-        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize);
-        List<InFlightRequest> requests = collate(cluster, batches);
+        List<RecordBatch> batches = this.accumulator.drain(sendable, this.maxRequestSize, now);
+        List<InFlightRequest> requests = collate(cluster, batches, now);
+        sensors.updateProduceRequestMetrics(requests);
 
         if (ready.size() > 0) {
             log.trace("Partitions with complete batches: {}", ready);
@@ -215,8 +228,8 @@ public class Sender implements Runnable {
 
         // handle responses, connections, and disconnections
         handleSends(this.selector.completedSends());
-        handleResponses(this.selector.completedReceives(), time.milliseconds());
-        handleDisconnects(this.selector.disconnected(), time.milliseconds());
+        handleResponses(this.selector.completedReceives(), now);
+        handleDisconnects(this.selector.disconnected(), now);
         handleConnects(this.selector.connected());
     }
 
@@ -234,7 +247,7 @@ public class Sender implements Runnable {
         if (nodeStates.isConnected(node.id())) {
             Set<String> topics = metadata.topics();
             this.metadataFetchInProgress = true;
-            InFlightRequest metadataRequest = metadataRequest(node.id(), topics);
+            InFlightRequest metadataRequest = metadataRequest(now, node.id(), topics);
             log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
             sends.add(metadataRequest.request);
             this.inFlightRequests.add(metadataRequest);
@@ -349,16 +362,9 @@ public class Sender implements Runnable {
                 ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey());
                 switch (requestKey) {
                     case PRODUCE:
-                        for (RecordBatch batch : request.batches.values()) {
-                            if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
-                                log.warn("Destination node disconnected for topic-partition {}, retrying ({} attempts left).",
-                                    batch.topicPartition, this.retries - batch.attempts - 1);
-                                this.accumulator.reenqueue(batch, now);
-                            } else {
-                                batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
-                                this.accumulator.deallocate(batch);
-                            }
-                        }
+                        int correlation = request.request.header().correlationId();
+                        for (RecordBatch batch : request.batches.values())
+                            completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, correlation, now);
                         break;
                     case METADATA:
                         metadataFetchInProgress = false;
@@ -408,6 +414,7 @@ public class Sender implements Runnable {
      * Handle responses from the server
      */
     private void handleResponses(List<NetworkReceive> receives, long now) {
+        long ns = time.nanoseconds();
         for (NetworkReceive receive : receives) {
             int source = receive.source();
             InFlightRequest req = inFlightRequests.nextCompleted(source);
@@ -420,12 +427,14 @@ public class Sender implements Runnable {
                 handleProduceResponse(req, req.request.header(), body, now);
             } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
                 log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
-                    .correlationId());
+                                                                                                                        .correlationId());
                 handleMetadataResponse(req.request.header(), body, now);
             } else {
                 throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
             }
+            this.sensors.recordLatency(receive.source(), now - req.created, ns);
         }
+
     }
 
     private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
@@ -453,21 +462,39 @@ public class Sender implements Runnable {
                 if (error.exception() instanceof InvalidMetadataException)
                     metadata.forceUpdate();
                 RecordBatch batch = request.batches.get(tp);
-                if (canRetry(batch, error)) {
-                    // retry
-                    log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
-                        header.correlationId(), batch.topicPartition, this.retries - batch.attempts - 1, error);
-                    this.accumulator.reenqueue(batch, now);
-                } else {
-                    // tell the user the result of their request
-                    batch.done(response.baseOffset, error.exception());
-                    this.accumulator.deallocate(batch);
-                }
+                completeBatch(batch, error, response.baseOffset, header.correlationId(), now);
             }
         }
     }
 
     /**
+     * Complete or retry the given batch of records.
+     * @param batch The record batch
+     * @param error The error (or null if none)
+     * @param baseOffset The base offset assigned to the records if successful
+     * @param correlationId The correlation id for the request
+     * @param now The current time stamp
+     */
+    private void completeBatch(RecordBatch batch, Errors error, long baseOffset, long correlationId, long now) {
+        if (error != Errors.NONE && canRetry(batch, error)) {
+            // retry
+            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
+                     correlationId,
+                     batch.topicPartition,
+                     this.retries - batch.attempts - 1,
+                     error);
+            this.accumulator.reenqueue(batch, now);
+            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
+        } else {
+            // tell the user the result of their request
+            batch.done(baseOffset, error.exception());
+            this.accumulator.deallocate(batch);
+            if (error != Errors.NONE)
+                this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+        }
+    }
+
+    /**
      * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
      */
     private boolean canRetry(RecordBatch batch, Errors error) {
@@ -488,16 +515,16 @@ public class Sender implements Runnable {
     /**
      * Create a metadata request for the given topics
      */
-    private InFlightRequest metadataRequest(int node, Set<String> topics) {
+    private InFlightRequest metadataRequest(long now, int node, Set<String> topics) {
         MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
         RequestSend send = new RequestSend(node, header(ApiKeys.METADATA), metadata.toStruct());
-        return new InFlightRequest(true, send, null);
+        return new InFlightRequest(now, true, send, null);
     }
 
     /**
      * Collate the record batches into a list of produce requests on a per-node basis
      */
-    private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches) {
+    private List<InFlightRequest> collate(Cluster cluster, List<RecordBatch> batches, long now) {
         Map<Integer, List<RecordBatch>> collated = new HashMap<Integer, List<RecordBatch>>();
         for (RecordBatch batch : batches) {
             Node node = cluster.leaderFor(batch.topicPartition);
@@ -510,14 +537,14 @@ public class Sender implements Runnable {
         }
         List<InFlightRequest> requests = new ArrayList<InFlightRequest>(collated.size());
         for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet())
-            requests.add(produceRequest(entry.getKey(), acks, requestTimeout, entry.getValue()));
+            requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()));
         return requests;
     }
 
     /**
      * Create a produce request from the given record batches
      */
-    private InFlightRequest produceRequest(int destination, short acks, int timeout, List<RecordBatch> batches) {
+    private InFlightRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) {
         Map<TopicPartition, RecordBatch> batchesByPartition = new HashMap<TopicPartition, RecordBatch>();
         Map<String, List<RecordBatch>> batchesByTopic = new HashMap<String, List<RecordBatch>>();
         for (RecordBatch batch : batches) {
@@ -552,7 +579,7 @@ public class Sender implements Runnable {
         produce.set("topic_data", topicDatas.toArray());
 
         RequestSend send = new RequestSend(destination, header(ApiKeys.PRODUCE), produce);
-        return new InFlightRequest(acks != 0, send, batchesByPartition);
+        return new InFlightRequest(now, acks != 0, send, batchesByPartition);
     }
 
     private RequestHeader header(ApiKeys key) {
@@ -641,16 +668,19 @@ public class Sender implements Runnable {
      * An request that hasn't been fully processed yet
      */
     private static final class InFlightRequest {
+        public long created;
         public boolean expectResponse;
         public Map<TopicPartition, RecordBatch> batches;
         public RequestSend request;
 
         /**
+         * @param created The unix timestamp for the time at which this request was created.
          * @param expectResponse Should we expect a response message or is this request complete once it is sent?
          * @param request The request
          * @param batches The record batches contained in the request if it is a produce request
          */
-        public InFlightRequest(boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
+        public InFlightRequest(long created, boolean expectResponse, RequestSend request, Map<TopicPartition, RecordBatch> batches) {
+            this.created = created;
             this.batches = batches;
             this.request = request;
             this.expectResponse = expectResponse;
@@ -728,4 +758,124 @@ public class Sender implements Runnable {
         }
     }
 
+    /**
+     * A collection of sensors for the sender
+     */
+    private class SenderMetrics {
+
+        private final Metrics metrics;
+        public final Sensor retrySensor;
+        public final Sensor errorSensor;
+        public final Sensor queueTimeSensor;
+        public final Sensor requestTimeSensor;
+        public final Sensor recordsPerRequestSensor;
+        public final Sensor batchSizeSensor;
+        public final Sensor maxRecordSizeSensor;
+
+        public SenderMetrics(Metrics metrics) {
+            this.metrics = metrics;
+            this.batchSizeSensor = metrics.sensor("batch-size");
+            this.queueTimeSensor = metrics.sensor("queue-time");
+            this.requestTimeSensor = metrics.sensor("request-time");
+            this.recordsPerRequestSensor = metrics.sensor("records-per-request");
+            this.retrySensor = metrics.sensor("record-retries");
+            this.errorSensor = metrics.sensor("errors");
+            this.maxRecordSizeSensor = metrics.sensor("record-size-max");
+            this.batchSizeSensor.add("batch-size-avg", "The average number of bytes sent per partition per-request.", new Avg());
+            this.retrySensor.add("record-retry-rate", "The average per-second number of retried record sends", new Rate());
+            this.requestTimeSensor.add("request-latency-avg", "The average request latency in ms", new Avg());
+            this.requestTimeSensor.add("request-latency-max", "The maximum request latency in ms", new Max());
+            this.queueTimeSensor.add("record-queue-time-avg",
+                                     "The average time in ms record batches spent in the record accumulator.",
+                                     new Avg());
+            this.queueTimeSensor.add("record-queue-time-max",
+                                     "The maximum time in ms record batches spent in the record accumulator.",
+                                     new Max());
+            this.errorSensor.add("record-error-rate", "The average per-second number of record sends that resulted in errors", new Rate());
+            this.recordsPerRequestSensor.add("record-send-rate", "The average number of records sent per second.", new Rate());
+            this.recordsPerRequestSensor.add("records-per-request", "The average number of records per request.", new Avg());
+            this.maxRecordSizeSensor.add("record-size-max", "The maximum record size", new Max());
+            this.metrics.addMetric("requests-in-flight", "The current number of in-flight requests awaiting a response.", new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return inFlightRequests.totalInFlightRequests();
+                }
+            });
+            metrics.addMetric("metadata-age", "The age in seconds of the current producer metadata being used.", new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return (TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS) - metadata.lastUpdate()) / 1000.0;
+                }
+            });
+        }
+
+        public void updateProduceRequestMetrics(List<InFlightRequest> requests) {
+            long ns = time.nanoseconds();
+            for (int i = 0; i < requests.size(); i++) {
+                InFlightRequest request = requests.get(i);
+                int records = 0;
+                if (request.batches != null) {
+                    for (RecordBatch batch : request.batches.values()) {
+
+                        // per-topic record count
+                        String topicRecordsCountName = "topic." + batch.topicPartition.topic() + ".records-per-batch";
+                        Sensor topicRecordCount = this.metrics.getSensor(topicRecordsCountName);
+                        if (topicRecordCount == null) {
+                            topicRecordCount = this.metrics.sensor(topicRecordsCountName);
+                            topicRecordCount.add("topic." + batch.topicPartition.topic() + ".record-send-rate", new Rate());
+                        }
+                        topicRecordCount.record(batch.recordCount);
+
+                        // per-topic bytes-per-second
+                        String topicByteRateName = "topic." + batch.topicPartition.topic() + ".bytes";
+                        Sensor topicByteRate = this.metrics.getSensor(topicByteRateName);
+                        if (topicByteRate == null) {
+                            topicByteRate = this.metrics.sensor(topicByteRateName);
+                            topicByteRate.add("topic." + batch.topicPartition.topic() + ".byte-rate", new Rate());
+                        }
+                        topicByteRate.record(batch.records.sizeInBytes());
+
+                        this.batchSizeSensor.record(batch.records.sizeInBytes(), ns);
+                        this.queueTimeSensor.record(batch.drained - batch.created, ns);
+                        this.maxRecordSizeSensor.record(batch.maxRecordSize, ns);
+                        records += batch.recordCount;
+                    }
+                    this.recordsPerRequestSensor.record(records, ns);
+                }
+            }
+        }
+
+        public void recordRetries(String topic, int count) {
+            this.retrySensor.record(count);
+            String topicRetryName = "topic." + topic + ".record-retries";
+            Sensor topicRetrySensor = this.metrics.getSensor(topicRetryName);
+            if (topicRetrySensor == null) {
+                topicRetrySensor = this.metrics.sensor(topicRetryName);
+                topicRetrySensor.add("topic." + topic + ".record-retry-rate", new Rate());
+            }
+            topicRetrySensor.record(count);
+        }
+
+        public void recordErrors(String topic, int count) {
+            this.errorSensor.record(count);
+            String topicErrorName = "topic." + topic + ".record-errors";
+            Sensor topicErrorSensor = this.metrics.getSensor(topicErrorName);
+            if (topicErrorSensor == null) {
+                topicErrorSensor = this.metrics.sensor(topicErrorName);
+                topicErrorSensor.add("topic." + topic + ".record-error-rate", new Rate());
+            }
+            topicErrorSensor.record(count);
+        }
+
+        public void recordLatency(int node, long latency, long nowNs) {
+            this.requestTimeSensor.record(latency, nowNs);
+            String nodeTimeName = "server." + node + ".latency";
+            Sensor nodeRequestTime = this.metrics.getSensor(nodeTimeName);
+            if (nodeRequestTime == null) {
+                nodeRequestTime = this.metrics.sensor(nodeTimeName);
+                nodeRequestTime.add("node-" + node + ".latency-avg", new Avg());
+                nodeRequestTime.add("node-" + node + ".latency-max", new Max());
+            }
+            nodeRequestTime.record(latency, nowNs);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
index 05085e0..3b3fb2c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ b/clients/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.tools;
 
@@ -30,7 +26,8 @@ public class ProducerPerformance {
 
     public static void main(String[] args) throws Exception {
         if (args.length < 5) {
-            System.err.println("USAGE: java " + ProducerPerformance.class.getName() + " url topic_name num_records record_size acks [compression_type]");
+            System.err.println("USAGE: java " + ProducerPerformance.class.getName()
+                               + " url topic_name num_records record_size acks [compression_type]");
             System.exit(1);
         }
         String url = args[0];
@@ -43,8 +40,8 @@ public class ProducerPerformance {
         props.setProperty(ProducerConfig.BROKER_LIST_CONFIG, url);
         props.setProperty(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, Integer.toString(5 * 1000));
         props.setProperty(ProducerConfig.REQUEST_TIMEOUT_CONFIG, Integer.toString(Integer.MAX_VALUE));
-        props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(256 * 1024 * 1024));
-        props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(256 * 1024));
+        props.setProperty(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, Integer.toString(64 * 1024 * 1024));
+        props.setProperty(ProducerConfig.MAX_PARTITION_SIZE_CONFIG, Integer.toString(64 * 1024));
         if (args.length == 6)
             props.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, args[5]);
 
@@ -65,9 +62,9 @@ public class ProducerPerformance {
         for (int i = 0; i < numRecords; i++) {
             long sendStart = System.currentTimeMillis();
             producer.send(record, callback);
-            long sendEllapsed = System.currentTimeMillis() - sendStart;
-            maxLatency = Math.max(maxLatency, sendEllapsed);
-            totalLatency += sendEllapsed;
+            long sendElapsed = System.currentTimeMillis() - sendStart;
+            maxLatency = Math.max(maxLatency, sendElapsed);
+            totalLatency += sendElapsed;
             if (i % reportingInterval == 0) {
                 System.out.printf("%d  max latency = %d ms, avg latency = %.5f\n",
                                   i,
@@ -81,7 +78,7 @@ public class ProducerPerformance {
         double msgsSec = 1000.0 * numRecords / (double) ellapsed;
         double mbSec = msgsSec * (recordSize + Records.LOG_OVERHEAD) / (1024.0 * 1024.0);
         producer.close();
-        System.out.printf("%d records sent in %d ms ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
+        System.out.printf("%d records sent in %d ms. %.2f records per second (%.2f mb/sec).\n", numRecords, ellapsed, msgsSec, mbSec);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index 6db2dfb..49be401 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -1,32 +1,27 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
-
 /**
  * A registry of sensors and metrics.
  * <p>
@@ -67,7 +62,7 @@ public class Metrics {
      * Create a metrics repository with no metric reporters and default configuration.
      */
     public Metrics(Time time) {
-        this(new MetricConfig(), new ArrayList<MetricsReporter>(), time);
+        this(new MetricConfig(), new ArrayList<MetricsReporter>(0), time);
     }
 
     /**
@@ -87,8 +82,8 @@ public class Metrics {
      */
     public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) {
         this.config = defaultConfig;
-        this.sensors = new ConcurrentHashMap<String, Sensor>();
-        this.metrics = new ConcurrentHashMap<String, KafkaMetric>();
+        this.sensors = new CopyOnWriteMap<String, Sensor>();
+        this.metrics = new CopyOnWriteMap<String, KafkaMetric>();
         this.reporters = Utils.notNull(reporters);
         this.time = time;
         for (MetricsReporter reporter : reporters)
@@ -96,8 +91,26 @@ public class Metrics {
     }
 
     /**
-     * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
-     * value recorded with this sensor.
+     * Get the sensor with the given name if it exists
+     * @param name The name of the sensor
+     * @return Return the sensor or null if no such sensor exists
+     */
+    public Sensor getSensor(String name) {
+        return this.sensors.get(Utils.notNull(name));
+    }
+
+    /**
+     * Get or create a sensor with the given unique name and no parent sensors.
+     * @param name The sensor name
+     * @return The sensor
+     */
+    public Sensor sensor(String name) {
+        return sensor(name, null, (Sensor[]) null);
+    }
+
+    /**
+     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+     * receive every value recorded with this sensor.
      * @param name The name of the sensor
      * @param parents The parent sensors
      * @return The sensor that is created
@@ -107,15 +120,15 @@ public class Metrics {
     }
 
     /**
-     * Create a sensor with the given unique name and zero or more parent sensors. All parent sensors will receive every
-     * value recorded with this sensor.
+     * Get or create a sensor with the given unique name and zero or more parent sensors. All parent sensors will
+     * receive every value recorded with this sensor.
      * @param name The name of the sensor
      * @param config A default configuration to use for this sensor for metrics that don't have their own config
      * @param parents The parent sensors
      * @return The sensor that is created
      */
     public synchronized Sensor sensor(String name, MetricConfig config, Sensor... parents) {
-        Sensor s = this.sensors.get(Utils.notNull(name));
+        Sensor s = getSensor(name);
         if (s == null) {
             s = new Sensor(this, name, parents, config == null ? this.config : config, time);
             this.sensors.put(name, s);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
index 7e4849b..d68349b 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics;
 
@@ -26,7 +22,6 @@ import org.apache.kafka.common.metrics.CompoundStat.NamedMeasurable;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 
-
 /**
  * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
  * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
@@ -46,7 +41,7 @@ public final class Sensor {
         super();
         this.registry = registry;
         this.name = Utils.notNull(name);
-        this.parents = parents;
+        this.parents = parents == null ? new Sensor[0] : parents;
         this.metrics = new ArrayList<KafkaMetric>();
         this.stats = new ArrayList<Stat>();
         this.config = config;
@@ -86,27 +81,39 @@ public final class Sensor {
         record(value, time.nanoseconds());
     }
 
-    private void record(double value, long time) {
+    /**
+     * Record a value at a known time. This method is slightly faster than {@link #record(double)} since it will reuse
+     * the time stamp.
+     * @param value The value we are recording
+     * @param time The time in nanoseconds
+     * @throws QuotaViolationException if recording this value moves a metric beyond its configured maximum or minimum
+     *         bound
+     */
+    public void record(double value, long time) {
         synchronized (this) {
             // increment all the stats
             for (int i = 0; i < this.stats.size(); i++)
                 this.stats.get(i).record(config, value, time);
             checkQuotas(time);
-
         }
         for (int i = 0; i < parents.length; i++)
             parents[i].record(value, time);
     }
 
+    /**
+     * Check if we have violated our quota for any metric that has a configured quota
+     * @param time
+     */
     private void checkQuotas(long time) {
         for (int i = 0; i < this.metrics.size(); i++) {
             KafkaMetric metric = this.metrics.get(i);
             MetricConfig config = metric.config();
             if (config != null) {
                 Quota quota = config.quota();
-                if (quota != null)
+                if (quota != null) {
                     if (!quota.acceptable(metric.value(time)))
                         throw new QuotaViolationException("Metric " + metric.name() + " is in violation of its quota of " + quota.bound());
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
index 4d54916..b47ed88 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
@@ -74,7 +74,7 @@ public class Percentiles extends SampledStat implements CompoundStat {
     }
 
     public double value(MetricConfig config, long now, double quantile) {
-        timeoutObsoleteSamples(config, now);
+        purgeObsoleteSamples(config, now);
         float count = 0.0f;
         for (Sample sample : this.samples)
             count += sample.eventCount;

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
index 3b0454f..7f5cc53 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -22,12 +18,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.MetricConfig;
 
-
 /**
- * The rate of the given quanitity. By default this is the total observed over a set of samples from a sampled statistic
- * divided by the ellapsed time over the sample windows. Alternative {@link SampledStat} implementations can be
- * provided, however, to record the rate of occurences (e.g. the count of values measured over the time interval) or
- * other such values.
+ * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic
+ * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided,
+ * however, to record the rate of occurrences (e.g. the count of values measured over the time interval) or other such
+ * values.
  */
 public class Rate implements MeasurableStat {
 
@@ -42,6 +37,10 @@ public class Rate implements MeasurableStat {
         this(unit, new SampledTotal());
     }
 
+    public Rate(SampledStat stat) {
+        this(TimeUnit.SECONDS, stat);
+    }
+
     public Rate(TimeUnit unit, SampledStat stat) {
         this.stat = stat;
         this.unit = unit;
@@ -58,8 +57,9 @@ public class Rate implements MeasurableStat {
 
     @Override
     public double measure(MetricConfig config, long now) {
-        double ellapsed = convert(now - stat.oldest().lastWindow);
-        return stat.measure(config, now) / ellapsed;
+        double value = stat.measure(config, now);
+        double elapsed = convert(now - stat.oldest(now).lastWindow);
+        return value / elapsed;
     }
 
     private double convert(long time) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
index f8b413a..776f3a1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.metrics.stats;
 
@@ -22,7 +18,6 @@ import java.util.List;
 import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.MetricConfig;
 
-
 /**
  * A SampledStat records a single scalar value measured over one or more samples. Each sample is recorded over a
  * configurable window. The window can be defined by number of events or ellapsed time (or both, if both are given the
@@ -72,7 +67,7 @@ public abstract class SampledStat implements MeasurableStat {
 
     @Override
     public double measure(MetricConfig config, long now) {
-        timeoutObsoleteSamples(config, now);
+        purgeObsoleteSamples(config, now);
         return combine(this.samples, config, now);
     }
 
@@ -82,20 +77,28 @@ public abstract class SampledStat implements MeasurableStat {
         return this.samples.get(this.current);
     }
 
-    public Sample oldest() {
-        return this.samples.get((this.current + 1) % this.samples.size());
+    public Sample oldest(long now) {
+        if (samples.size() == 0)
+            this.samples.add(newSample(now));
+        Sample oldest = this.samples.get(0);
+        for (int i = 1; i < this.samples.size(); i++) {
+            Sample curr = this.samples.get(i);
+            if (curr.lastWindow < oldest.lastWindow)
+                oldest = curr;
+        }
+        return oldest;
     }
 
     protected abstract void update(Sample sample, MetricConfig config, double value, long now);
 
     public abstract double combine(List<Sample> samples, MetricConfig config, long now);
 
-    /* Timeout any windows that have expired in the absense of any events */
-    protected void timeoutObsoleteSamples(MetricConfig config, long now) {
+    /* Timeout any windows that have expired in the absence of any events */
+    protected void purgeObsoleteSamples(MetricConfig config, long now) {
+        long expireAge = config.samples() * config.timeWindowNs();
         for (int i = 0; i < samples.size(); i++) {
-            int idx = (this.current + i) % samples.size();
-            Sample sample = this.samples.get(idx);
-            if (now - sample.lastWindow >= (i + 1) * config.timeWindowNs())
+            Sample sample = this.samples.get(i);
+            if (now - sample.lastWindow >= expireAge)
                 sample.reset(now);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
index 9305b61..6350424 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 
@@ -29,6 +25,7 @@ public class ByteBufferSend implements Send {
     private final int destination;
     protected final ByteBuffer[] buffers;
     private int remaining;
+    private int size;
 
     public ByteBufferSend(int destination, ByteBuffer... buffers) {
         super();
@@ -36,6 +33,7 @@ public class ByteBufferSend implements Send {
         this.buffers = buffers;
         for (int i = 0; i < buffers.length; i++)
             remaining += buffers[i].remaining();
+        this.size = remaining;
     }
 
     @Override
@@ -58,6 +56,10 @@ public class ByteBufferSend implements Send {
         return this.remaining;
     }
 
+    public int size() {
+        return this.size;
+    }
+
     @Override
     public long writeTo(GatheringByteChannel channel) throws IOException {
         long written = channel.write(buffers);

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 51d4892..dcc639a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.network;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 9839632..02c0606 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -25,8 +25,18 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,21 +78,25 @@ public class Selector implements Selectable {
     private final List<NetworkReceive> completedReceives;
     private final List<Integer> disconnected;
     private final List<Integer> connected;
+    private final Time time;
+    private final SelectorMetrics sensors;
 
     /**
      * Create a new selector
      */
-    public Selector() {
+    public Selector(Metrics metrics, Time time) {
         try {
             this.selector = java.nio.channels.Selector.open();
         } catch (IOException e) {
             throw new KafkaException(e);
         }
+        this.time = time;
         this.keys = new HashMap<Integer, SelectionKey>();
         this.completedSends = new ArrayList<NetworkSend>();
         this.completedReceives = new ArrayList<NetworkReceive>();
         this.connected = new ArrayList<Integer>();
         this.disconnected = new ArrayList<Integer>();
+        this.sensors = new SelectorMetrics(metrics);
     }
 
     /**
@@ -192,7 +206,11 @@ public class Selector implements Selectable {
         }
 
         /* check ready keys */
+        long startSelect = time.nanoseconds();
         int readyKeys = select(timeout);
+        long endSelect = time.nanoseconds();
+        this.sensors.selectTime.record(endSelect - startSelect, endSelect);
+
         if (readyKeys > 0) {
             Set<SelectionKey> keys = this.selector.selectedKeys();
             Iterator<SelectionKey> iter = keys.iterator();
@@ -208,6 +226,7 @@ public class Selector implements Selectable {
                         channel.finishConnect();
                         key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
                         this.connected.add(transmissions.id);
+                        this.sensors.connectionCreated.record();
                     }
 
                     /* read from any connections that have readable data */
@@ -218,6 +237,7 @@ public class Selector implements Selectable {
                         if (transmissions.receive.complete()) {
                             transmissions.receive.payload().rewind();
                             this.completedReceives.add(transmissions.receive);
+                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
                             transmissions.clearReceive();
                         }
                     }
@@ -227,6 +247,7 @@ public class Selector implements Selectable {
                         transmissions.send.writeTo(channel);
                         if (transmissions.send.remaining() <= 0) {
                             this.completedSends.add(transmissions.send);
+                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
                             transmissions.clearSend();
                             key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                         }
@@ -241,6 +262,8 @@ public class Selector implements Selectable {
                 }
             }
         }
+        long endIo = time.nanoseconds();
+        this.sensors.ioTime.record(endIo - endSelect, endIo);
     }
 
     @Override
@@ -309,6 +332,7 @@ public class Selector implements Selectable {
         } catch (IOException e) {
             log.error("Exception closing connection to node {}:", trans.id, e);
         }
+        this.sensors.connectionClosed.record();
     }
 
     /**
@@ -364,4 +388,87 @@ public class Selector implements Selectable {
         }
     }
 
+    private class SelectorMetrics {
+        private final Metrics metrics;
+        public final Sensor connectionClosed;
+        public final Sensor connectionCreated;
+        public final Sensor bytesTransferred;
+        public final Sensor bytesSent;
+        public final Sensor bytesReceived;
+        public final Sensor selectTime;
+        public final Sensor ioTime;
+
+        public SelectorMetrics(Metrics metrics) {
+            this.metrics = metrics;
+            this.connectionClosed = this.metrics.sensor("connections-closed");
+            this.connectionCreated = this.metrics.sensor("connections-created");
+            this.bytesTransferred = this.metrics.sensor("bytes-sent-received");
+            this.bytesSent = this.metrics.sensor("bytes-sent", bytesTransferred);
+            this.bytesReceived = this.metrics.sensor("bytes-received", bytesTransferred);
+            this.selectTime = this.metrics.sensor("select-time");
+            this.ioTime = this.metrics.sensor("io-time");
+            bytesTransferred.add("network-ops-per-second",
+                                 "The average number of network operations (reads or writes) on all connections per second.",
+                                 new Rate(new Count()));
+            this.bytesSent.add("bytes-sent-per-second", "The average number of outgoing bytes sent per second to all servers.", new Rate());
+            this.bytesSent.add("requests-sent-per-second", "The average number of requests sent per second.", new Rate(new Count()));
+            this.bytesSent.add("request-size-avg", "The average size of all requests in the window..", new Avg());
+            this.bytesSent.add("request-size-max", "The maximum size of any request sent in the window.", new Max());
+            this.bytesReceived.add("bytes-received-per-second", "Bytes/second read off all sockets", new Rate());
+            this.bytesReceived.add("responses-received-per-second", "Responses received sent per second.", new Rate(new Count()));
+            this.connectionCreated.add("connections-created-per-second",
+                                       "New connections established per second in the window.",
+                                       new Rate());
+            this.connectionClosed.add("connections-closed-per-second", "Connections closed per second in the window.", new Rate());
+            this.selectTime.add("select-calls-per-second",
+                                "Number of times the I/O layer checked for new I/O to perform per second",
+                                new Rate(new Count()));
+            this.selectTime.add("io-wait-time-avg-ns",
+                                "The average length of time the I/O thread speant waiting for a socket ready for reads or writes in nanoseconds.",
+                                new Avg());
+            this.selectTime.add("io-wait-percentage", "The fraction of time the I/O thread spent waiting.", new Rate(TimeUnit.NANOSECONDS));
+            this.ioTime.add("io-time-avg-ns", "The average length of time for I/O per select call in nanoseconds.", new Avg());
+            this.ioTime.add("io-percentage", "The fraction of time the I/O thread spent doing I/O", new Rate(TimeUnit.NANOSECONDS));
+            this.metrics.addMetric("connection-count", "The current number of active connections.", new Measurable() {
+                public double measure(MetricConfig config, long now) {
+                    return keys.size();
+                }
+            });
+        }
+
+        public void recordBytesSent(int node, int bytes) {
+            this.bytesSent.record(bytes);
+            if (node >= 0) {
+                String name = "node-" + node + ".bytes-sent";
+                Sensor sensor = this.metrics.getSensor(name);
+                if (sensor == null) {
+                    sensor = this.metrics.sensor(name);
+                    sensor.add("node-" + node + ".bytes-sent-per-second", new Rate());
+                    sensor.add("node-" + node + ".requests-sent-per-second",
+                               "The average number of requests sent per second.",
+                               new Rate(new Count()));
+                    sensor.add("connection-" + node + ".request-size-avg", "The average size of all requests in the window..", new Avg());
+                    sensor.add("node-" + node + ".request-size-max", "The maximum size of any request sent in the window.", new Max());
+                }
+                sensor.record(bytes);
+            }
+        }
+
+        public void recordBytesReceived(int node, int bytes) {
+            this.bytesReceived.record(bytes);
+            if (node >= 0) {
+                String name = "node-" + node + ".bytes-received";
+                Sensor sensor = this.metrics.getSensor(name);
+                if (sensor == null) {
+                    sensor = this.metrics.sensor(name);
+                    sensor.add("node-" + node + ".bytes-received-per-second", new Rate());
+                    sensor.add("node-" + node + ".responses-received-per-second",
+                               "The average number of responses received per second.",
+                               new Rate(new Count()));
+                }
+                sensor.record(bytes);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java b/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
index 187d22f..9c0e81a 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CopyOnWriteMap.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.common.utils;
 
@@ -24,7 +20,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * A simple read-optimized map implementation that synchronizes only writes and does a fully copy on each modification
+ * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification
  */
 public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
index 09a5355..8b4ac0f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
@@ -1,18 +1,14 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
  */
 package org.apache.kafka.clients.producer;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/23d7fc47/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index ed56906..f37ab77 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -52,7 +52,7 @@ public class RecordAccumulatorTest {
         }
         accum.append(tp, key, value, CompressionType.NONE, null);
         assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
-        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
         Iterator<LogEntry> iter = batch.records.iterator();
@@ -80,7 +80,7 @@ public class RecordAccumulatorTest {
         assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
         time.sleep(10);
         assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
-        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE);
+        List<RecordBatch> batches = accum.drain(asList(tp), Integer.MAX_VALUE, 0);
         assertEquals(1, batches.size());
         RecordBatch batch = batches.get(0);
         Iterator<LogEntry> iter = batch.records.iterator();
@@ -101,7 +101,7 @@ public class RecordAccumulatorTest {
         }
         assertEquals("Both partitions should be ready", 2, accum.ready(time.milliseconds()).size());
 
-        List<RecordBatch> batches = accum.drain(partitions, 1024);
+        List<RecordBatch> batches = accum.drain(partitions, 1024, 0);
         assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size());
     }
 
@@ -131,7 +131,7 @@ public class RecordAccumulatorTest {
         long now = time.milliseconds();
         while (read < numThreads * msgs) {
             List<TopicPartition> tps = accum.ready(now);
-            List<RecordBatch> batches = accum.drain(tps, 5 * 1024);
+            List<RecordBatch> batches = accum.drain(tps, 5 * 1024, 0);
             for (RecordBatch batch : batches) {
                 for (LogEntry entry : batch.records)
                     read++;