You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2014/03/05 05:07:50 UTC

git commit: KAFKA-1286 Retry can block. Patch from Guozhang, reviewed by jay.

Repository: kafka
Updated Branches:
  refs/heads/trunk 153ac8aa6 -> 5ba48348b


KAFKA-1286 Retry can block. Patch from Guozhang, reviewed by jay.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5ba48348
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5ba48348
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5ba48348

Branch: refs/heads/trunk
Commit: 5ba48348b3abb8f84fda0798d992ff2e0a04051d
Parents: 153ac8a
Author: Jay Kreps <ja...@gmail.com>
Authored: Tue Mar 4 20:05:51 2014 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Tue Mar 4 20:07:00 2014 -0800

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  1 +
 .../kafka/clients/producer/ProducerConfig.java  |  9 ++-
 .../producer/internals/RecordAccumulator.java   | 19 ++++-
 .../clients/producer/internals/RecordBatch.java |  2 +
 .../clients/producer/internals/Sender.java      | 74 ++++++++++---------
 .../apache/kafka/common/protocol/ApiKeys.java   | 14 +++-
 .../kafka/common/requests/MetadataRequest.java  | 12 ++++
 .../kafka/common/requests/MetadataResponse.java | 12 ++++
 .../kafka/common/requests/ProduceResponse.java  | 57 +++++++++++++++
 .../clients/producer/RecordAccumulatorTest.java | 10 +--
 .../kafka/clients/producer/SenderTest.java      |  2 +-
 .../kafka/controller/KafkaController.scala      |  5 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  4 +-
 .../kafka/api/ProducerFailureHandlingTest.scala | 75 +++++++++++++++++---
 14 files changed, 237 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index bedd2a9..1ac6943 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -102,6 +102,7 @@ public class KafkaProducer implements Producer {
         this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.MAX_PARTITION_SIZE_CONFIG),
                                                  this.totalMemorySize,
                                                  config.getLong(ProducerConfig.LINGER_MS_CONFIG),
+                                                 config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                                                  config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG),
                                                  metrics,
                                                  new SystemTime());

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index d8e35e7..307659c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -136,6 +136,12 @@ public class ProducerConfig extends AbstractConfig {
     public static final String MAX_RETRIES_CONFIG = "request.retries";
 
     /**
+     * The amount of time to wait before attempting to resend produce request to a given topic partition. This avoids
+     * repeated sending-and-failing in a tight loop
+     */
+    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
+
+    /**
      * Should we register the Kafka metrics as JMX mbeans?
      */
     public static final String ENABLE_JMX_CONFIG = "enable.jmx";
@@ -160,7 +166,8 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 10L, atLeast(0L), "blah blah")
                                 .define(BLOCK_ON_BUFFER_FULL_CONFIG, Type.BOOLEAN, true, "blah blah")
                                 .define(ENABLE_JMX_CONFIG, Type.BOOLEAN, true, "")
-                                .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "");
+                                .define(MAX_RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), "")
+                                .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 500L, atLeast(0L), "blah blah");
     }
 
     ProducerConfig(Map<? extends Object, ? extends Object> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 6990274..e455987 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -51,9 +51,10 @@ public final class RecordAccumulator {
     private int drainIndex;
     private final int batchSize;
     private final long lingerMs;
-    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
+    private final long retryBackoffMs;
     private final BufferPool free;
     private final Time time;
+    private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
 
     /**
      * Create a new record accumulator
@@ -63,16 +64,25 @@ public final class RecordAccumulator {
      * @param lingerMs An artificial delay time to add before declaring a records instance that isn't full ready for
      *        sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some
      *        latency for potentially better throughput due to more batching (and hence fewer, larger requests).
+     * @param retryBackoffMs An artificial delay time to retry the produce request upon receiving an error. This avoids
+     *        exhausting all retries in a short period of time.
      * @param blockOnBufferFull If true block when we are out of memory; if false throw an exception when we are out of
      *        memory
      * @param metrics The metrics
      * @param time The time instance to use
      */
-    public RecordAccumulator(int batchSize, long totalSize, long lingerMs, boolean blockOnBufferFull, Metrics metrics, Time time) {
+    public RecordAccumulator(int batchSize,
+                             long totalSize,
+                             long lingerMs,
+                             long retryBackoffMs,
+                             boolean blockOnBufferFull,
+                             Metrics metrics,
+                             Time time) {
         this.drainIndex = 0;
         this.closed = false;
         this.batchSize = batchSize;
         this.lingerMs = lingerMs;
+        this.retryBackoffMs = retryBackoffMs;
         this.batches = new CopyOnWriteMap<TopicPartition, Deque<RecordBatch>>();
         this.free = new BufferPool(totalSize, batchSize, blockOnBufferFull);
         this.time = time;
@@ -155,6 +165,7 @@ public final class RecordAccumulator {
      */
     public void reenqueue(RecordBatch batch, long now) {
         batch.attempts++;
+        batch.lastAttempt = now;
         Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);
@@ -181,9 +192,11 @@ public final class RecordAccumulator {
             synchronized (deque) {
                 RecordBatch batch = deque.peekFirst();
                 if (batch != null) {
+                    boolean backingOff = batch.attempts > 0 && batch.lastAttempt + retryBackoffMs > now;
                     boolean full = deque.size() > 1 || !batch.records.buffer().hasRemaining();
                     boolean expired = now - batch.created >= lingerMs;
-                    if (full | expired | exhausted | closed)
+                    boolean sendable = full | expired | exhausted | closed;
+                    if (sendable & !backingOff)
                         ready.add(batch.topicPartition);
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index c7fbf3c..038a05a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -34,6 +34,7 @@ public final class RecordBatch {
     public int recordCount = 0;
     public volatile int attempts = 0;
     public final long created;
+    public long lastAttempt;
     public final MemoryRecords records;
     public final TopicPartition topicPartition;
     private final ProduceRequestResult produceFuture;
@@ -41,6 +42,7 @@ public final class RecordBatch {
 
     public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
         this.created = now;
+        this.lastAttempt = now;
         this.records = records;
         this.topicPartition = tp;
         this.produceFuture = new ProduceRequestResult();

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 7942623..2acb96d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.RequestSend;
 import org.apache.kafka.common.requests.ResponseHeader;
@@ -142,7 +143,7 @@ public class Sender implements Runnable {
      * The main run loop for the sender thread
      */
     public void run() {
-        log.trace("Starting Kafka producer I/O thread.");
+        log.debug("Starting Kafka producer I/O thread.");
 
         // main loop, runs until close is called
         while (running) {
@@ -153,7 +154,7 @@ public class Sender implements Runnable {
             }
         }
 
-        log.trace("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
+        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
 
         // okay we stopped accepting requests but there may still be
         // requests in the accumulator or waiting for acknowledgment,
@@ -170,7 +171,7 @@ public class Sender implements Runnable {
         // close all the connections
         this.selector.close();
 
-        log.trace("Shutdown of Kafka producer I/O thread has completed.");
+        log.debug("Shutdown of Kafka producer I/O thread has completed.");
     }
 
     /**
@@ -216,8 +217,8 @@ public class Sender implements Runnable {
 
         // handle responses, connections, and disconnections
         handleSends(this.selector.completedSends());
-        handleResponses(this.selector.completedReceives(), now);
-        handleDisconnects(this.selector.disconnected(), now);
+        handleResponses(this.selector.completedReceives(), time.milliseconds());
+        handleDisconnects(this.selector.disconnected(), time.milliseconds());
         handleConnects(this.selector.connected());
 
         return ready.size();
@@ -348,15 +349,25 @@ public class Sender implements Runnable {
             nodeStates.disconnected(node);
             log.debug("Node {} disconnected.", node);
             for (InFlightRequest request : this.inFlightRequests.clearAll(node)) {
-                if (request.batches != null) {
-                    for (RecordBatch batch : request.batches.values()) {
-                        if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
-                            this.accumulator.reenqueue(batch, now);
-                        } else {
-                            batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
-                            this.accumulator.deallocate(batch);
+                ApiKeys requestKey = ApiKeys.forId(request.request.header().apiKey());
+                switch (requestKey) {
+                    case PRODUCE:
+                        for (RecordBatch batch : request.batches.values()) {
+                            if (canRetry(batch, Errors.NETWORK_EXCEPTION)) {
+                                log.warn("Destination node disconnected for topic-partition {}, retrying ({} attempts left).",
+                                    batch.topicPartition, this.retries - batch.attempts - 1);
+                                this.accumulator.reenqueue(batch, now);
+                            } else {
+                                batch.done(-1L, new NetworkException("The server disconnected unexpectedly without sending a response."));
+                                this.accumulator.deallocate(batch);
+                            }
                         }
-                    }
+                        break;
+                    case METADATA:
+                        metadataFetchInProgress = false;
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unexpected api key id: " + requestKey.id);
                 }
             }
         }
@@ -409,18 +420,18 @@ public class Sender implements Runnable {
             correlate(req.request.header(), header);
             if (req.request.header().apiKey() == ApiKeys.PRODUCE.id) {
                 log.trace("Received produce response from node {} with correlation id {}", source, req.request.header().correlationId());
-                handleProduceResponse(req, body, now);
+                handleProduceResponse(req, req.request.header(), body, now);
             } else if (req.request.header().apiKey() == ApiKeys.METADATA.id) {
                 log.trace("Received metadata response response from node {} with correlation id {}", source, req.request.header()
-                                                                                                                        .correlationId());
-                handleMetadataResponse(body, now);
+                    .correlationId());
+                handleMetadataResponse(req.request.header(), body, now);
             } else {
                 throw new IllegalStateException("Unexpected response type: " + req.request.header().apiKey());
             }
         }
     }
 
-    private void handleMetadataResponse(Struct body, long now) {
+    private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
         this.metadataFetchInProgress = false;
         MetadataResponse response = new MetadataResponse(body);
         Cluster cluster = response.cluster();
@@ -429,35 +440,30 @@ public class Sender implements Runnable {
         if (cluster.nodes().size() > 0)
             this.metadata.update(cluster, now);
         else
-            log.trace("Ignoring empty metadata response.");
+            log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
     }
 
     /**
      * Handle a produce response
      */
-    private void handleProduceResponse(InFlightRequest request, Struct response, long now) {
-        for (Object topicResponse : (Object[]) response.get("responses")) {
-            Struct topicRespStruct = (Struct) topicResponse;
-            String topic = (String) topicRespStruct.get("topic");
-            for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
-                Struct partRespStruct = (Struct) partResponse;
-                int partition = (Integer) partRespStruct.get("partition");
-                short errorCode = (Short) partRespStruct.get("error_code");
-
-                // if we got an error we may need to refresh our metadata
-                Errors error = Errors.forCode(errorCode);
+    private void handleProduceResponse(InFlightRequest request, RequestHeader header, Struct body, long now) {
+        ProduceResponse pr = new ProduceResponse(body);
+        for (Map<TopicPartition, ProduceResponse.PartitionResponse> responses : pr.responses().values()) {
+            for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : responses.entrySet()) {
+                TopicPartition tp = entry.getKey();
+                ProduceResponse.PartitionResponse response = entry.getValue();
+                Errors error = Errors.forCode(response.errorCode);
                 if (error.exception() instanceof InvalidMetadataException)
                     metadata.forceUpdate();
-
-                long offset = (Long) partRespStruct.get("base_offset");
-                RecordBatch batch = request.batches.get(new TopicPartition(topic, partition));
+                RecordBatch batch = request.batches.get(tp);
                 if (canRetry(batch, error)) {
                     // retry
-                    log.warn("Got error for topic-partition {}, retrying. Error: {}", topic, partition, error);
+                    log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
+                        header.correlationId(), batch.topicPartition, this.retries - batch.attempts - 1, error);
                     this.accumulator.reenqueue(batch, now);
                 } else {
                     // tell the user the result of their request
-                    batch.done(offset, error.exception());
+                    batch.done(response.baseOffset, error.exception());
                     this.accumulator.deallocate(batch);
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 21a2592..6fe7573 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.common.protocol;
 
+
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Identifiers for all the Kafka APIs
  */
@@ -29,12 +33,17 @@ public enum ApiKeys {
     OFFSET_COMMIT(6, "offset_commit"),
     OFFSET_FETCH(7, "offset_fetch");
 
-    public static int MAX_API_KEY = 0;
+    private static ApiKeys[] codeToType;
+    public static int MAX_API_KEY = -1;
 
     static {
         for (ApiKeys key : ApiKeys.values()) {
             MAX_API_KEY = Math.max(MAX_API_KEY, key.id);
         }
+        codeToType = new ApiKeys[MAX_API_KEY+1];
+        for (ApiKeys key : ApiKeys.values()) {
+            codeToType[key.id] = key;
+        }
     }
 
     /** the perminant and immutable id of an API--this can't change ever */
@@ -48,4 +57,7 @@ public enum ApiKeys {
         this.name = name;
     }
 
+    public static ApiKeys forId(int id) {
+        return codeToType[id];
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 91b9d64..f35bd87 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 73b7006..2652c32 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -1,3 +1,15 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
 package org.apache.kafka.common.requests;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
new file mode 100644
index 0000000..6ac2e53
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ProduceResponse {
+    public class PartitionResponse {
+        public int partitionId;
+        public short errorCode;
+        public long baseOffset;
+
+        public PartitionResponse(int partitionId, short errorCode, long baseOffset) {
+            this.partitionId = partitionId;
+            this.errorCode = errorCode;
+            this.baseOffset = baseOffset;
+        }
+    }
+
+    private final Map<String, Map<TopicPartition, PartitionResponse>> responses;
+
+    public ProduceResponse(Struct struct) {
+        responses = new HashMap<String, Map<TopicPartition, PartitionResponse>>();
+        for (Object topicResponse : (Object[]) struct.get("responses")) {
+            Struct topicRespStruct = (Struct) topicResponse;
+            String topic = (String) topicRespStruct.get("topic");
+            Map<TopicPartition, PartitionResponse> topicResponses = new HashMap<TopicPartition, PartitionResponse>();
+            for (Object partResponse : (Object[]) topicRespStruct.get("partition_responses")) {
+                Struct partRespStruct = (Struct) partResponse;
+                int partition = (Integer) partRespStruct.get("partition");
+                short errorCode = (Short) partRespStruct.get("error_code");
+                long offset = (Long) partRespStruct.get("base_offset");
+                TopicPartition tp = new TopicPartition(topic, partition);
+                topicResponses.put(tp, new PartitionResponse(partition, errorCode, offset));
+            }
+            responses.put(topic, topicResponses);
+        }
+    }
+
+    public Map<String, Map<TopicPartition, PartitionResponse>> responses() {
+        return this.responses;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
index a3bf07e..ed56906 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
@@ -44,7 +44,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testFull() throws Exception {
         long now = time.milliseconds();
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
         int appends = 1024 / msgSize;
         for (int i = 0; i < appends; i++) {
             accum.append(tp, key, value, CompressionType.NONE, null);
@@ -67,7 +67,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
-        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time);
         accum.append(tp, key, new byte[2 * batchSize], CompressionType.NONE, null);
         assertEquals("Our partition should be ready", asList(tp), accum.ready(time.milliseconds()));
     }
@@ -75,7 +75,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testLinger() throws Exception {
         long lingerMs = 10L;
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time);
         accum.append(tp, key, value, CompressionType.NONE, null);
         assertEquals("No partitions should be ready", 0, accum.ready(time.milliseconds()).size());
         time.sleep(10);
@@ -92,7 +92,7 @@ public class RecordAccumulatorTest {
 
     @Test
     public void testPartialDrain() throws Exception {
-        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, false, metrics, time);
+        RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 10L, 100L, false, metrics, time);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(new TopicPartition("test", 0), new TopicPartition("test", 1));
         for (TopicPartition tp : partitions) {
@@ -110,7 +110,7 @@ public class RecordAccumulatorTest {
         final int numThreads = 5;
         final int msgs = 10000;
         final int numParts = 10;
-        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, true, metrics, time);
+        final RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, 0L, 100L, true, metrics, time);
         List<Thread> threads = new ArrayList<Thread>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
index 19a0125..12c9500 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
@@ -58,7 +58,7 @@ public class SenderTest {
     private Metadata metadata = new Metadata(0, Long.MAX_VALUE);
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = new Metrics(time);
-    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, false, metrics, time);
+    private RecordAccumulator accumulator = new RecordAccumulator(batchSize, 1024 * 1024, 0L, 0L, false, metrics, time);
     private Sender sender = new Sender(selector,
                                        metadata,
                                        this.accumulator,

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index b58cdcd..4deff9d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -337,7 +337,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     inLock(controllerContext.controllerLock) {
       if (config.autoLeaderRebalanceEnable)
         autoRebalanceScheduler.shutdown()
-      deleteTopicManager.shutdown()
+      if (deleteTopicManager != null)
+        deleteTopicManager.shutdown()
       Utils.unregisterMBean(KafkaController.MBeanName)
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
@@ -647,8 +648,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       if(controllerContext.controllerChannelManager != null) {
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
-        info("Controller shutdown complete")
       }
+      info("Controller shutdown complete")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index bcd2bb7..8ed9b68 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -162,7 +162,7 @@ class LogManager(val logDirs: Array[File],
    * Close all the logs
    */
   def shutdown() {
-    debug("Shutting down.")
+    info("Shutting down.")
     try {
       // stop the cleaner first
       if(cleaner != null)
@@ -179,7 +179,7 @@ class LogManager(val logDirs: Array[File],
       // regardless of whether the close succeeded, we need to unlock the data directories
       dirLocks.foreach(_.destroy())
     }
-    debug("Shutdown complete.")
+    info("Shutdown complete.")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5ba48348/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 4b7c544..c002f5e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -26,7 +26,7 @@ import java.lang.Integer
 import java.util.concurrent.{TimeoutException, TimeUnit, ExecutionException}
 
 import kafka.server.{KafkaConfig, KafkaServer}
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{ShutdownableThread, Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.consumer.SimpleConsumer
 
@@ -267,18 +267,73 @@ class ProducerFailureHandlingTest extends JUnit3Suite with ZooKeeperTestHarness
   def testBrokerFailure() {
     // create topic
     val leaders = TestUtils.createTopic(zkClient, topic1, 1, 2, servers)
-    val leader = leaders(0)
+    val partition = 0
+    var leader = leaders(partition)
     assertTrue("Leader of partition 0 of the topic should exist", leader.isDefined)
 
-    val record = new ProducerRecord(topic1, null, "key".getBytes, "value".getBytes)
-    assertEquals("Returned metadata should have offset 0", producer3.send(record).get.offset, 0L)
+    val scheduler = new ProducerScheduler()
+    scheduler.start
+
+    // rolling bounce brokers
+    for (i <- 0 until 5) {
+      server1.shutdown()
+      server1.awaitShutdown()
+      server1.startup
+
+      Thread.sleep(2000)
+
+      server2.shutdown()
+      server2.awaitShutdown()
+      server2.startup
+
+      Thread.sleep(2000)
 
-    // shutdown broker
-    val serverToShutdown = if(leader.get == server1.config.brokerId) server1 else server2
-    serverToShutdown.shutdown()
-    serverToShutdown.awaitShutdown()
+      assertTrue(scheduler.failed == false)
+    }
+
+    scheduler.shutdown
+    leader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, partition, 500)
+
+    val fetchResponse = if(leader.get == server1.config.brokerId) {
+      consumer1.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
+    } else {
+      consumer2.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
+    }
 
-    // send the message again, it should still succeed due-to retry
-    assertEquals("Returned metadata should have offset 1", producer3.send(record).get.offset, 1L)
+    val messages = fetchResponse.iterator.toList.map(_.message)
+    val uniqueMessages = messages.toSet
+    val uniqueMessageSize = uniqueMessages.size
+
+    assertEquals("Should have fetched " + scheduler.sent + " unique messages", scheduler.sent, uniqueMessageSize)
+  }
+
+  private class ProducerScheduler extends ShutdownableThread("daemon-producer", false)
+  {
+    val numRecords = 1000
+    var sent = 0
+    var failed = false
+
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BROKER_LIST_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.REQUIRED_ACKS_CONFIG, (-1).toString)
+    producerProps.put(ProducerConfig.TOTAL_BUFFER_MEMORY_CONFIG, bufferSize.toString)
+    producerProps.put(ProducerConfig.MAX_RETRIES_CONFIG, 10.toString)
+    producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString)
+
+    val producer = new KafkaProducer(producerProps)
+
+    override def doWork(): Unit = {
+      val responses =
+        for (i <- sent+1 to sent+numRecords)
+        yield producer.send(new ProducerRecord(topic1, null, null, i.toString.getBytes))
+      val futures = responses.toList
+
+      try {
+        futures.map(_.get)
+        sent += numRecords
+      } catch {
+        case e : Exception => failed = true
+      }
+    }
   }
 }
\ No newline at end of file